Skip to main content

lance_encoding/previous/encodings/logical/
blob.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{collections::VecDeque, sync::Arc, vec};
5
6use arrow_array::{
7    Array, ArrayRef, LargeBinaryArray, PrimitiveArray, StructArray, UInt64Array, cast::AsArray,
8    types::UInt64Type,
9};
10use arrow_buffer::{
11    BooleanBuffer, BooleanBufferBuilder, Buffer, NullBuffer, OffsetBuffer, ScalarBuffer,
12};
13use arrow_schema::DataType;
14use bytes::Bytes;
15use futures::{FutureExt, future::BoxFuture};
16
17use lance_core::{Error, Result, datatypes::BLOB_DESC_FIELDS};
18
19use crate::{
20    EncodingsIo,
21    buffer::LanceBuffer,
22    decoder::{
23        DecodeArrayTask, FilterExpression, MessageType, NextDecodeTask, PriorityRange,
24        ScheduledScanLine, SchedulerContext,
25    },
26    encoder::{EncodeTask, FieldEncoder, OutOfLineBuffers},
27    format::pb::{Blob, ColumnEncoding, column_encoding},
28    previous::decoder::{DecoderReady, FieldScheduler, LogicalPageDecoder, SchedulingJob},
29    repdef::RepDefBuilder,
30};
31
32/// A field scheduler for large binary data
33///
34/// Large binary data (1MiB+) can be inefficient if we store as a regular primitive.  We
35/// essentially end up with 1 page per row (or a few rows) and the overhead of the
36/// metadata can be significant.
37///
38/// At the same time the benefits of using pages (contiguous arrays) are pretty small since
39/// we can generally perform random access at these sizes without much penalty.
40///
41/// This encoder gives up the random access and stores the large binary data out of line.  This
42/// keeps the metadata small.
43#[derive(Debug)]
44pub struct BlobFieldScheduler {
45    descriptions_scheduler: Arc<dyn FieldScheduler>,
46}
47
48impl BlobFieldScheduler {
49    pub fn new(descriptions_scheduler: Arc<dyn FieldScheduler>) -> Self {
50        Self {
51            descriptions_scheduler,
52        }
53    }
54}
55
56#[derive(Debug)]
57struct BlobFieldSchedulingJob<'a> {
58    descriptions_job: Box<dyn SchedulingJob + 'a>,
59}
60
61impl SchedulingJob for BlobFieldSchedulingJob<'_> {
62    fn schedule_next(
63        &mut self,
64        context: &mut SchedulerContext,
65        priority: &dyn PriorityRange,
66    ) -> Result<ScheduledScanLine> {
67        let next_descriptions = self.descriptions_job.schedule_next(context, priority)?;
68        let mut priority = priority.current_priority();
69        let decoders = next_descriptions.decoders.into_iter().map(|decoder| {
70            let decoder = decoder.into_legacy();
71            let path = decoder.path;
72            let mut decoder = decoder.decoder;
73            let num_rows = decoder.num_rows();
74            let descriptions_fut = async move {
75                decoder
76                    .wait_for_loaded(decoder.num_rows() - 1)
77                    .await
78                    .unwrap();
79                let descriptions_task = decoder.drain(decoder.num_rows()).unwrap();
80                descriptions_task.task.decode()
81            }
82            .boxed();
83            let decoder = Box::new(BlobFieldDecoder {
84                io: context.io().clone(),
85                unloaded_descriptions: Some(descriptions_fut),
86                positions: PrimitiveArray::<UInt64Type>::from_iter_values(vec![]),
87                sizes: PrimitiveArray::<UInt64Type>::from_iter_values(vec![]),
88                num_rows,
89                loaded: VecDeque::new(),
90                validity: VecDeque::new(),
91                rows_loaded: 0,
92                rows_drained: 0,
93                base_priority: priority,
94            });
95            priority += num_rows;
96            MessageType::DecoderReady(DecoderReady { decoder, path })
97        });
98        Ok(ScheduledScanLine {
99            decoders: decoders.collect(),
100            rows_scheduled: next_descriptions.rows_scheduled,
101        })
102    }
103
104    fn num_rows(&self) -> u64 {
105        self.descriptions_job.num_rows()
106    }
107}
108
109impl FieldScheduler for BlobFieldScheduler {
110    fn schedule_ranges<'a>(
111        &'a self,
112        ranges: &[std::ops::Range<u64>],
113        filter: &FilterExpression,
114    ) -> Result<Box<dyn SchedulingJob + 'a>> {
115        let descriptions_job = self
116            .descriptions_scheduler
117            .schedule_ranges(ranges, filter)?;
118        Ok(Box::new(BlobFieldSchedulingJob { descriptions_job }))
119    }
120
121    fn num_rows(&self) -> u64 {
122        self.descriptions_scheduler.num_rows()
123    }
124
125    fn initialize<'a>(
126        &'a self,
127        filter: &'a FilterExpression,
128        context: &'a SchedulerContext,
129    ) -> BoxFuture<'a, Result<()>> {
130        self.descriptions_scheduler.initialize(filter, context)
131    }
132}
133
134pub struct BlobFieldDecoder {
135    io: Arc<dyn EncodingsIo>,
136    unloaded_descriptions: Option<BoxFuture<'static, Result<ArrayRef>>>,
137    positions: PrimitiveArray<UInt64Type>,
138    sizes: PrimitiveArray<UInt64Type>,
139    num_rows: u64,
140    loaded: VecDeque<Bytes>,
141    validity: VecDeque<BooleanBuffer>,
142    rows_loaded: u64,
143    rows_drained: u64,
144    base_priority: u64,
145}
146
147impl BlobFieldDecoder {
148    fn drain_validity(&mut self, num_values: usize) -> Result<Option<NullBuffer>> {
149        let mut validity = BooleanBufferBuilder::new(num_values);
150        let mut remaining = num_values;
151        while remaining > 0 {
152            let next = self.validity.front_mut().unwrap();
153            if remaining < next.len() {
154                let slice = next.slice(0, remaining);
155                validity.append_buffer(&slice);
156                *next = next.slice(remaining, next.len() - remaining);
157                remaining = 0;
158            } else {
159                validity.append_buffer(next);
160                remaining -= next.len();
161                self.validity.pop_front();
162            }
163        }
164        let nulls = NullBuffer::new(validity.finish());
165        if nulls.null_count() == 0 {
166            Ok(None)
167        } else {
168            Ok(Some(nulls))
169        }
170    }
171}
172
173impl std::fmt::Debug for BlobFieldDecoder {
174    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175        f.debug_struct("BlobFieldDecoder")
176            .field("num_rows", &self.num_rows)
177            .field("rows_loaded", &self.rows_loaded)
178            .field("rows_drained", &self.rows_drained)
179            .finish()
180    }
181}
182
183impl LogicalPageDecoder for BlobFieldDecoder {
184    fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<'_, Result<()>> {
185        async move {
186            if self.unloaded_descriptions.is_some() {
187                let descriptions = self.unloaded_descriptions.take().unwrap().await?;
188                let descriptions = descriptions.as_struct();
189                self.positions = descriptions.column(0).as_primitive().clone();
190                self.sizes = descriptions.column(1).as_primitive().clone();
191            }
192            let start = self.rows_loaded as usize;
193            let end = (loaded_need + 1).min(self.num_rows) as usize;
194            let positions = self.positions.values().slice(start, end - start);
195            let sizes = self.sizes.values().slice(start, end - start);
196            let ranges = positions
197                .iter()
198                .zip(sizes.iter())
199                .map(|(position, size)| *position..(*position + *size))
200                .collect::<Vec<_>>();
201            let validity = positions
202                .iter()
203                .zip(sizes.iter())
204                .map(|(p, s)| *p != 1 || *s != 0)
205                .collect::<BooleanBuffer>();
206            self.validity.push_back(validity);
207            self.rows_loaded = end as u64;
208            let bytes = self
209                .io
210                .submit_request(ranges, self.base_priority + start as u64)
211                .await?;
212            self.loaded.extend(bytes);
213            Ok(())
214        }
215        .boxed()
216    }
217
218    fn rows_loaded(&self) -> u64 {
219        self.rows_loaded
220    }
221
222    fn num_rows(&self) -> u64 {
223        self.num_rows
224    }
225
226    fn rows_drained(&self) -> u64 {
227        self.rows_drained
228    }
229
230    fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
231        let bytes = self.loaded.drain(0..num_rows as usize).collect::<Vec<_>>();
232        let validity = self.drain_validity(num_rows as usize)?;
233        self.rows_drained += num_rows;
234        Ok(NextDecodeTask {
235            num_rows,
236            task: Box::new(BlobArrayDecodeTask::new(bytes, validity)),
237        })
238    }
239
240    fn data_type(&self) -> &DataType {
241        &DataType::LargeBinary
242    }
243}
244
245struct BlobArrayDecodeTask {
246    bytes: Vec<Bytes>,
247    validity: Option<NullBuffer>,
248}
249
250impl BlobArrayDecodeTask {
251    fn new(bytes: Vec<Bytes>, validity: Option<NullBuffer>) -> Self {
252        Self { bytes, validity }
253    }
254}
255
256impl DecodeArrayTask for BlobArrayDecodeTask {
257    fn decode(self: Box<Self>) -> Result<ArrayRef> {
258        let num_bytes = self.bytes.iter().map(|b| b.len()).sum::<usize>();
259        let offsets = self
260            .bytes
261            .iter()
262            .scan(0, |state, b| {
263                let start = *state;
264                *state += b.len();
265                Some(start as i64)
266            })
267            .chain(std::iter::once(num_bytes as i64))
268            .collect::<Vec<_>>();
269        let offsets = OffsetBuffer::new(ScalarBuffer::from(offsets));
270        let mut buffer = Vec::with_capacity(num_bytes);
271        for bytes in self.bytes {
272            buffer.extend_from_slice(&bytes);
273        }
274        let data_buf = Buffer::from_vec(buffer);
275        Ok(Arc::new(LargeBinaryArray::new(
276            offsets,
277            data_buf,
278            self.validity,
279        )))
280    }
281}
282
283pub struct BlobFieldEncoder {
284    description_encoder: Box<dyn FieldEncoder>,
285}
286
287impl BlobFieldEncoder {
288    pub fn new(description_encoder: Box<dyn FieldEncoder>) -> Self {
289        Self {
290            description_encoder,
291        }
292    }
293
294    fn write_bins(array: ArrayRef, external_buffers: &mut OutOfLineBuffers) -> Result<ArrayRef> {
295        let binarray = array.as_binary_opt::<i64>().ok_or_else(|| {
296            Error::invalid_input_source(
297                format!("Expected large_binary and received {}", array.data_type()).into(),
298            )
299        })?;
300        let mut positions = Vec::with_capacity(array.len());
301        let mut sizes = Vec::with_capacity(array.len());
302        let data = binarray.values();
303        let nulls = binarray
304            .nulls()
305            .cloned()
306            .unwrap_or(NullBuffer::new_valid(binarray.len()));
307        for (w, is_valid) in binarray.value_offsets().windows(2).zip(nulls.into_iter()) {
308            if is_valid {
309                let start = w[0] as u64;
310                let end = w[1] as u64;
311                let size = end - start;
312                if size > 0 {
313                    let val = data.slice_with_length(start as usize, size as usize);
314                    let position = external_buffers.add_buffer(LanceBuffer::from(val));
315                    positions.push(position);
316                    sizes.push(size);
317                } else {
318                    // Empty values are always (0,0)
319                    positions.push(0);
320                    sizes.push(0);
321                }
322            } else {
323                // Null values are always (1, 0)
324                positions.push(1);
325                sizes.push(0);
326            }
327        }
328        let positions = Arc::new(UInt64Array::from(positions));
329        let sizes = Arc::new(UInt64Array::from(sizes));
330        let descriptions = Arc::new(StructArray::new(
331            BLOB_DESC_FIELDS.clone(),
332            vec![positions, sizes],
333            None,
334        ));
335        Ok(descriptions)
336    }
337}
338
339impl FieldEncoder for BlobFieldEncoder {
340    fn maybe_encode(
341        &mut self,
342        array: ArrayRef,
343        external_buffers: &mut OutOfLineBuffers,
344        repdef: RepDefBuilder,
345        row_number: u64,
346        num_rows: u64,
347    ) -> Result<Vec<EncodeTask>> {
348        let descriptions = Self::write_bins(array, external_buffers)?;
349        self.description_encoder.maybe_encode(
350            descriptions,
351            external_buffers,
352            repdef,
353            row_number,
354            num_rows,
355        )
356    }
357
358    // If there is any data left in the buffer then create an encode task from it
359    fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
360        self.description_encoder.flush(external_buffers)
361    }
362
363    fn num_columns(&self) -> u32 {
364        self.description_encoder.num_columns()
365    }
366
367    fn finish(
368        &mut self,
369        external_buffers: &mut OutOfLineBuffers,
370    ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
371        let inner_finished = self.description_encoder.finish(external_buffers);
372        async move {
373            let mut cols = inner_finished.await?;
374            assert_eq!(cols.len(), 1);
375            let encoding = std::mem::take(&mut cols[0].encoding);
376            let wrapped_encoding = ColumnEncoding {
377                column_encoding: Some(column_encoding::ColumnEncoding::Blob(Box::new(Blob {
378                    inner: Some(Box::new(encoding)),
379                }))),
380            };
381            cols[0].encoding = wrapped_encoding;
382            Ok(cols)
383        }
384        .boxed()
385    }
386}
387
388#[cfg(test)]
389pub mod tests {
390    use std::{
391        collections::HashMap,
392        sync::{Arc, LazyLock},
393    };
394
395    use arrow_array::LargeBinaryArray;
396    use arrow_schema::{DataType, Field};
397    use lance_arrow::BLOB_META_KEY;
398
399    use crate::{
400        format::pb::column_encoding,
401        testing::{TestCases, check_round_trip_encoding_of_data, check_specific_random},
402        version::LanceFileVersion,
403    };
404
405    static BLOB_META: LazyLock<HashMap<String, String>> = LazyLock::new(|| {
406        [(BLOB_META_KEY.to_string(), "true".to_string())]
407            .iter()
408            .cloned()
409            .collect::<HashMap<_, _>>()
410    });
411
412    #[test_log::test(tokio::test)]
413    async fn test_basic_blob() {
414        let field = Field::new("", DataType::LargeBinary, false).with_metadata(BLOB_META.clone());
415        check_specific_random(
416            field,
417            TestCases::basic().with_max_file_version(LanceFileVersion::V2_1),
418        )
419        .await;
420    }
421
422    #[test_log::test(tokio::test)]
423    async fn test_simple_blob() {
424        let val1: &[u8] = &[1, 2, 3];
425        let val2: &[u8] = &[7, 8, 9];
426        let array = Arc::new(LargeBinaryArray::from(vec![Some(val1), None, Some(val2)]));
427        let test_cases = TestCases::default()
428            .with_max_file_version(LanceFileVersion::V2_1)
429            .with_expected_encoding("packed_struct")
430            .with_verify_encoding(Arc::new(|cols, version| {
431                if version < &LanceFileVersion::V2_1 {
432                    // In 2.0 we used a special "column encoding" to mark blob fields.  In 2.1 we
433                    // don't do this and just rely on the regular page encoding.
434                    assert_eq!(cols.len(), 1);
435                    let col = &cols[0];
436                    assert!(matches!(
437                        col.encoding.column_encoding.as_ref().unwrap(),
438                        column_encoding::ColumnEncoding::Blob(_)
439                    ));
440                }
441            }));
442        // Use blob encoding if requested
443        check_round_trip_encoding_of_data(vec![array.clone()], &test_cases, BLOB_META.clone())
444            .await;
445
446        let test_cases = TestCases::default()
447            .with_min_file_version(LanceFileVersion::V2_1)
448            .with_verify_encoding(Arc::new(|cols, version| {
449                if version < &LanceFileVersion::V2_1 {
450                    assert_eq!(cols.len(), 1);
451                    let col = &cols[0];
452                    assert!(!matches!(
453                        col.encoding.column_encoding.as_ref().unwrap(),
454                        column_encoding::ColumnEncoding::Blob(_)
455                    ));
456                }
457            }));
458        // Don't use blob encoding if not requested
459        check_round_trip_encoding_of_data(vec![array], &test_cases, Default::default()).await;
460    }
461}