lance_encoding/previous/encodings/logical/
blob.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{collections::VecDeque, sync::Arc, vec};
5
6use arrow_array::{
7    cast::AsArray, types::UInt64Type, Array, ArrayRef, LargeBinaryArray, PrimitiveArray,
8    StructArray, UInt64Array,
9};
10use arrow_buffer::{
11    BooleanBuffer, BooleanBufferBuilder, Buffer, NullBuffer, OffsetBuffer, ScalarBuffer,
12};
13use arrow_schema::DataType;
14use bytes::Bytes;
15use futures::{future::BoxFuture, FutureExt};
16use snafu::location;
17
18use lance_core::{datatypes::BLOB_DESC_FIELDS, Error, Result};
19
20use crate::{
21    buffer::LanceBuffer,
22    decoder::{
23        DecodeArrayTask, FilterExpression, MessageType, NextDecodeTask, PriorityRange,
24        ScheduledScanLine, SchedulerContext,
25    },
26    encoder::{EncodeTask, FieldEncoder, OutOfLineBuffers},
27    format::pb::{column_encoding, Blob, ColumnEncoding},
28    previous::decoder::{DecoderReady, FieldScheduler, LogicalPageDecoder, SchedulingJob},
29    repdef::RepDefBuilder,
30    EncodingsIo,
31};
32
33/// A field scheduler for large binary data
34///
35/// Large binary data (1MiB+) can be inefficient if we store as a regular primitive.  We
36/// essentially end up with 1 page per row (or a few rows) and the overhead of the
37/// metadata can be significant.
38///
39/// At the same time the benefits of using pages (contiguous arrays) are pretty small since
40/// we can generally perform random access at these sizes without much penalty.
41///
42/// This encoder gives up the random access and stores the large binary data out of line.  This
43/// keeps the metadata small.
44#[derive(Debug)]
45pub struct BlobFieldScheduler {
46    descriptions_scheduler: Arc<dyn FieldScheduler>,
47}
48
49impl BlobFieldScheduler {
50    pub fn new(descriptions_scheduler: Arc<dyn FieldScheduler>) -> Self {
51        Self {
52            descriptions_scheduler,
53        }
54    }
55}
56
57#[derive(Debug)]
58struct BlobFieldSchedulingJob<'a> {
59    descriptions_job: Box<dyn SchedulingJob + 'a>,
60}
61
62impl SchedulingJob for BlobFieldSchedulingJob<'_> {
63    fn schedule_next(
64        &mut self,
65        context: &mut SchedulerContext,
66        priority: &dyn PriorityRange,
67    ) -> Result<ScheduledScanLine> {
68        let next_descriptions = self.descriptions_job.schedule_next(context, priority)?;
69        let mut priority = priority.current_priority();
70        let decoders = next_descriptions.decoders.into_iter().map(|decoder| {
71            let decoder = decoder.into_legacy();
72            let path = decoder.path;
73            let mut decoder = decoder.decoder;
74            let num_rows = decoder.num_rows();
75            let descriptions_fut = async move {
76                decoder
77                    .wait_for_loaded(decoder.num_rows() - 1)
78                    .await
79                    .unwrap();
80                let descriptions_task = decoder.drain(decoder.num_rows()).unwrap();
81                descriptions_task.task.decode()
82            }
83            .boxed();
84            let decoder = Box::new(BlobFieldDecoder {
85                io: context.io().clone(),
86                unloaded_descriptions: Some(descriptions_fut),
87                positions: PrimitiveArray::<UInt64Type>::from_iter_values(vec![]),
88                sizes: PrimitiveArray::<UInt64Type>::from_iter_values(vec![]),
89                num_rows,
90                loaded: VecDeque::new(),
91                validity: VecDeque::new(),
92                rows_loaded: 0,
93                rows_drained: 0,
94                base_priority: priority,
95            });
96            priority += num_rows;
97            MessageType::DecoderReady(DecoderReady { decoder, path })
98        });
99        Ok(ScheduledScanLine {
100            decoders: decoders.collect(),
101            rows_scheduled: next_descriptions.rows_scheduled,
102        })
103    }
104
105    fn num_rows(&self) -> u64 {
106        self.descriptions_job.num_rows()
107    }
108}
109
110impl FieldScheduler for BlobFieldScheduler {
111    fn schedule_ranges<'a>(
112        &'a self,
113        ranges: &[std::ops::Range<u64>],
114        filter: &FilterExpression,
115    ) -> Result<Box<dyn SchedulingJob + 'a>> {
116        let descriptions_job = self
117            .descriptions_scheduler
118            .schedule_ranges(ranges, filter)?;
119        Ok(Box::new(BlobFieldSchedulingJob { descriptions_job }))
120    }
121
122    fn num_rows(&self) -> u64 {
123        self.descriptions_scheduler.num_rows()
124    }
125
126    fn initialize<'a>(
127        &'a self,
128        filter: &'a FilterExpression,
129        context: &'a SchedulerContext,
130    ) -> BoxFuture<'a, Result<()>> {
131        self.descriptions_scheduler.initialize(filter, context)
132    }
133}
134
135pub struct BlobFieldDecoder {
136    io: Arc<dyn EncodingsIo>,
137    unloaded_descriptions: Option<BoxFuture<'static, Result<ArrayRef>>>,
138    positions: PrimitiveArray<UInt64Type>,
139    sizes: PrimitiveArray<UInt64Type>,
140    num_rows: u64,
141    loaded: VecDeque<Bytes>,
142    validity: VecDeque<BooleanBuffer>,
143    rows_loaded: u64,
144    rows_drained: u64,
145    base_priority: u64,
146}
147
148impl BlobFieldDecoder {
149    fn drain_validity(&mut self, num_values: usize) -> Result<Option<NullBuffer>> {
150        let mut validity = BooleanBufferBuilder::new(num_values);
151        let mut remaining = num_values;
152        while remaining > 0 {
153            let next = self.validity.front_mut().unwrap();
154            if remaining < next.len() {
155                let slice = next.slice(0, remaining);
156                validity.append_buffer(&slice);
157                *next = next.slice(remaining, next.len() - remaining);
158                remaining = 0;
159            } else {
160                validity.append_buffer(next);
161                remaining -= next.len();
162                self.validity.pop_front();
163            }
164        }
165        let nulls = NullBuffer::new(validity.finish());
166        if nulls.null_count() == 0 {
167            Ok(None)
168        } else {
169            Ok(Some(nulls))
170        }
171    }
172}
173
174impl std::fmt::Debug for BlobFieldDecoder {
175    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176        f.debug_struct("BlobFieldDecoder")
177            .field("num_rows", &self.num_rows)
178            .field("rows_loaded", &self.rows_loaded)
179            .field("rows_drained", &self.rows_drained)
180            .finish()
181    }
182}
183
184impl LogicalPageDecoder for BlobFieldDecoder {
185    fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<'_, Result<()>> {
186        async move {
187            if self.unloaded_descriptions.is_some() {
188                let descriptions = self.unloaded_descriptions.take().unwrap().await?;
189                let descriptions = descriptions.as_struct();
190                self.positions = descriptions.column(0).as_primitive().clone();
191                self.sizes = descriptions.column(1).as_primitive().clone();
192            }
193            let start = self.rows_loaded as usize;
194            let end = (loaded_need + 1).min(self.num_rows) as usize;
195            let positions = self.positions.values().slice(start, end - start);
196            let sizes = self.sizes.values().slice(start, end - start);
197            let ranges = positions
198                .iter()
199                .zip(sizes.iter())
200                .map(|(position, size)| *position..(*position + *size))
201                .collect::<Vec<_>>();
202            let validity = positions
203                .iter()
204                .zip(sizes.iter())
205                .map(|(p, s)| *p != 1 || *s != 0)
206                .collect::<BooleanBuffer>();
207            self.validity.push_back(validity);
208            self.rows_loaded = end as u64;
209            let bytes = self
210                .io
211                .submit_request(ranges, self.base_priority + start as u64)
212                .await?;
213            self.loaded.extend(bytes);
214            Ok(())
215        }
216        .boxed()
217    }
218
219    fn rows_loaded(&self) -> u64 {
220        self.rows_loaded
221    }
222
223    fn num_rows(&self) -> u64 {
224        self.num_rows
225    }
226
227    fn rows_drained(&self) -> u64 {
228        self.rows_drained
229    }
230
231    fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
232        let bytes = self.loaded.drain(0..num_rows as usize).collect::<Vec<_>>();
233        let validity = self.drain_validity(num_rows as usize)?;
234        self.rows_drained += num_rows;
235        Ok(NextDecodeTask {
236            num_rows,
237            task: Box::new(BlobArrayDecodeTask::new(bytes, validity)),
238        })
239    }
240
241    fn data_type(&self) -> &DataType {
242        &DataType::LargeBinary
243    }
244}
245
246struct BlobArrayDecodeTask {
247    bytes: Vec<Bytes>,
248    validity: Option<NullBuffer>,
249}
250
251impl BlobArrayDecodeTask {
252    fn new(bytes: Vec<Bytes>, validity: Option<NullBuffer>) -> Self {
253        Self { bytes, validity }
254    }
255}
256
257impl DecodeArrayTask for BlobArrayDecodeTask {
258    fn decode(self: Box<Self>) -> Result<ArrayRef> {
259        let num_bytes = self.bytes.iter().map(|b| b.len()).sum::<usize>();
260        let offsets = self
261            .bytes
262            .iter()
263            .scan(0, |state, b| {
264                let start = *state;
265                *state += b.len();
266                Some(start as i64)
267            })
268            .chain(std::iter::once(num_bytes as i64))
269            .collect::<Vec<_>>();
270        let offsets = OffsetBuffer::new(ScalarBuffer::from(offsets));
271        let mut buffer = Vec::with_capacity(num_bytes);
272        for bytes in self.bytes {
273            buffer.extend_from_slice(&bytes);
274        }
275        let data_buf = Buffer::from_vec(buffer);
276        Ok(Arc::new(LargeBinaryArray::new(
277            offsets,
278            data_buf,
279            self.validity,
280        )))
281    }
282}
283
284pub struct BlobFieldEncoder {
285    description_encoder: Box<dyn FieldEncoder>,
286}
287
288impl BlobFieldEncoder {
289    pub fn new(description_encoder: Box<dyn FieldEncoder>) -> Self {
290        Self {
291            description_encoder,
292        }
293    }
294
295    fn write_bins(array: ArrayRef, external_buffers: &mut OutOfLineBuffers) -> Result<ArrayRef> {
296        let binarray = array
297            .as_binary_opt::<i64>()
298            .ok_or_else(|| Error::InvalidInput {
299                source: format!("Expected large_binary and received {}", array.data_type()).into(),
300                location: location!(),
301            })?;
302        let mut positions = Vec::with_capacity(array.len());
303        let mut sizes = Vec::with_capacity(array.len());
304        let data = binarray.values();
305        let nulls = binarray
306            .nulls()
307            .cloned()
308            .unwrap_or(NullBuffer::new_valid(binarray.len()));
309        for (w, is_valid) in binarray.value_offsets().windows(2).zip(nulls.into_iter()) {
310            if is_valid {
311                let start = w[0] as u64;
312                let end = w[1] as u64;
313                let size = end - start;
314                if size > 0 {
315                    let val = data.slice_with_length(start as usize, size as usize);
316                    let position = external_buffers.add_buffer(LanceBuffer::from(val));
317                    positions.push(position);
318                    sizes.push(size);
319                } else {
320                    // Empty values are always (0,0)
321                    positions.push(0);
322                    sizes.push(0);
323                }
324            } else {
325                // Null values are always (1, 0)
326                positions.push(1);
327                sizes.push(0);
328            }
329        }
330        let positions = Arc::new(UInt64Array::from(positions));
331        let sizes = Arc::new(UInt64Array::from(sizes));
332        let descriptions = Arc::new(StructArray::new(
333            BLOB_DESC_FIELDS.clone(),
334            vec![positions, sizes],
335            None,
336        ));
337        Ok(descriptions)
338    }
339}
340
341impl FieldEncoder for BlobFieldEncoder {
342    fn maybe_encode(
343        &mut self,
344        array: ArrayRef,
345        external_buffers: &mut OutOfLineBuffers,
346        repdef: RepDefBuilder,
347        row_number: u64,
348        num_rows: u64,
349    ) -> Result<Vec<EncodeTask>> {
350        let descriptions = Self::write_bins(array, external_buffers)?;
351        self.description_encoder.maybe_encode(
352            descriptions,
353            external_buffers,
354            repdef,
355            row_number,
356            num_rows,
357        )
358    }
359
360    // If there is any data left in the buffer then create an encode task from it
361    fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
362        self.description_encoder.flush(external_buffers)
363    }
364
365    fn num_columns(&self) -> u32 {
366        self.description_encoder.num_columns()
367    }
368
369    fn finish(
370        &mut self,
371        external_buffers: &mut OutOfLineBuffers,
372    ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
373        let inner_finished = self.description_encoder.finish(external_buffers);
374        async move {
375            let mut cols = inner_finished.await?;
376            assert_eq!(cols.len(), 1);
377            let encoding = std::mem::take(&mut cols[0].encoding);
378            let wrapped_encoding = ColumnEncoding {
379                column_encoding: Some(column_encoding::ColumnEncoding::Blob(Box::new(Blob {
380                    inner: Some(Box::new(encoding)),
381                }))),
382            };
383            cols[0].encoding = wrapped_encoding;
384            Ok(cols)
385        }
386        .boxed()
387    }
388}
389
390#[cfg(test)]
391pub mod tests {
392    use std::{
393        collections::HashMap,
394        sync::{Arc, LazyLock},
395    };
396
397    use arrow_array::LargeBinaryArray;
398    use arrow_schema::{DataType, Field};
399    use lance_core::datatypes::BLOB_META_KEY;
400
401    use crate::{
402        format::pb::column_encoding,
403        testing::{check_round_trip_encoding_of_data, check_round_trip_encoding_random, TestCases},
404        version::LanceFileVersion,
405    };
406
407    static BLOB_META: LazyLock<HashMap<String, String>> = LazyLock::new(|| {
408        [(BLOB_META_KEY.to_string(), "true".to_string())]
409            .iter()
410            .cloned()
411            .collect::<HashMap<_, _>>()
412    });
413
414    #[test_log::test(tokio::test)]
415    async fn test_blob() {
416        let field = Field::new("", DataType::LargeBinary, false).with_metadata(BLOB_META.clone());
417        check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
418    }
419
420    #[test_log::test(tokio::test)]
421    async fn test_simple_blob() {
422        let val1: &[u8] = &[1, 2, 3];
423        let val2: &[u8] = &[7, 8, 9];
424        let array = Arc::new(LargeBinaryArray::from(vec![Some(val1), None, Some(val2)]));
425        let test_cases = TestCases::default().with_verify_encoding(Arc::new(|cols| {
426            assert_eq!(cols.len(), 1);
427            let col = &cols[0];
428            assert!(matches!(
429                col.encoding.column_encoding.as_ref().unwrap(),
430                column_encoding::ColumnEncoding::Blob(_)
431            ));
432        }));
433        // Use blob encoding if requested
434        check_round_trip_encoding_of_data(vec![array.clone()], &test_cases, BLOB_META.clone())
435            .await;
436
437        let test_cases = TestCases::default().with_verify_encoding(Arc::new(|cols| {
438            assert_eq!(cols.len(), 1);
439            let col = &cols[0];
440            assert!(!matches!(
441                col.encoding.column_encoding.as_ref().unwrap(),
442                column_encoding::ColumnEncoding::Blob(_)
443            ));
444        }));
445        // Don't use blob encoding if not requested
446        check_round_trip_encoding_of_data(vec![array], &test_cases, Default::default()).await;
447    }
448}