Skip to main content

lance_encoding/previous/encodings/logical/
blob.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{collections::VecDeque, sync::Arc, vec};
5
6use arrow_array::{
7    Array, ArrayRef, LargeBinaryArray, PrimitiveArray, StructArray, UInt64Array, cast::AsArray,
8    types::UInt64Type,
9};
10use arrow_buffer::{
11    BooleanBuffer, BooleanBufferBuilder, Buffer, NullBuffer, OffsetBuffer, ScalarBuffer,
12};
13use arrow_schema::DataType;
14use bytes::Bytes;
15use futures::{FutureExt, future::BoxFuture};
16
17use lance_core::{Error, Result, datatypes::BLOB_DESC_FIELDS};
18
19use crate::{
20    EncodingsIo,
21    buffer::LanceBuffer,
22    decoder::{
23        DecodeArrayTask, FilterExpression, MessageType, NextDecodeTask, PriorityRange,
24        ScheduledScanLine, SchedulerContext,
25    },
26    encoder::{EncodeTask, FieldEncoder, OutOfLineBuffers},
27    format::pb::{Blob, ColumnEncoding, column_encoding},
28    previous::decoder::{DecoderReady, FieldScheduler, LogicalPageDecoder, SchedulingJob},
29    repdef::RepDefBuilder,
30};
31
32/// A field scheduler for large binary data
33///
34/// Large binary data (1MiB+) can be inefficient if we store as a regular primitive.  We
35/// essentially end up with 1 page per row (or a few rows) and the overhead of the
36/// metadata can be significant.
37///
38/// At the same time the benefits of using pages (contiguous arrays) are pretty small since
39/// we can generally perform random access at these sizes without much penalty.
40///
41/// This encoder gives up the random access and stores the large binary data out of line.  This
42/// keeps the metadata small.
43#[derive(Debug)]
44pub struct BlobFieldScheduler {
45    descriptions_scheduler: Arc<dyn FieldScheduler>,
46}
47
48impl BlobFieldScheduler {
49    pub fn new(descriptions_scheduler: Arc<dyn FieldScheduler>) -> Self {
50        Self {
51            descriptions_scheduler,
52        }
53    }
54}
55
56#[derive(Debug)]
57struct BlobFieldSchedulingJob<'a> {
58    descriptions_job: Box<dyn SchedulingJob + 'a>,
59}
60
61impl SchedulingJob for BlobFieldSchedulingJob<'_> {
62    fn schedule_next(
63        &mut self,
64        context: &mut SchedulerContext,
65        priority: &dyn PriorityRange,
66    ) -> Result<ScheduledScanLine> {
67        let next_descriptions = self.descriptions_job.schedule_next(context, priority)?;
68        let mut priority = priority.current_priority();
69        let decoders = next_descriptions.decoders.into_iter().map(|decoder| {
70            let decoder = decoder.into_legacy();
71            let path = decoder.path;
72            let mut decoder = decoder.decoder;
73            let num_rows = decoder.num_rows();
74            let descriptions_fut = async move {
75                decoder
76                    .wait_for_loaded(decoder.num_rows() - 1)
77                    .await
78                    .unwrap();
79                let descriptions_task = decoder.drain(decoder.num_rows()).unwrap();
80                descriptions_task.task.decode().map(|(arr, _)| arr)
81            }
82            .boxed();
83            let decoder = Box::new(BlobFieldDecoder {
84                io: context.io().clone(),
85                unloaded_descriptions: Some(descriptions_fut),
86                positions: PrimitiveArray::<UInt64Type>::from_iter_values(vec![]),
87                sizes: PrimitiveArray::<UInt64Type>::from_iter_values(vec![]),
88                num_rows,
89                loaded: VecDeque::new(),
90                validity: VecDeque::new(),
91                rows_loaded: 0,
92                rows_drained: 0,
93                base_priority: priority,
94            });
95            priority += num_rows;
96            MessageType::DecoderReady(DecoderReady { decoder, path })
97        });
98        Ok(ScheduledScanLine {
99            decoders: decoders.collect(),
100            rows_scheduled: next_descriptions.rows_scheduled,
101        })
102    }
103
104    fn num_rows(&self) -> u64 {
105        self.descriptions_job.num_rows()
106    }
107}
108
109impl FieldScheduler for BlobFieldScheduler {
110    fn schedule_ranges<'a>(
111        &'a self,
112        ranges: &[std::ops::Range<u64>],
113        filter: &FilterExpression,
114    ) -> Result<Box<dyn SchedulingJob + 'a>> {
115        let descriptions_job = self
116            .descriptions_scheduler
117            .schedule_ranges(ranges, filter)?;
118        Ok(Box::new(BlobFieldSchedulingJob { descriptions_job }))
119    }
120
121    fn num_rows(&self) -> u64 {
122        self.descriptions_scheduler.num_rows()
123    }
124
125    fn initialize<'a>(
126        &'a self,
127        filter: &'a FilterExpression,
128        context: &'a SchedulerContext,
129    ) -> BoxFuture<'a, Result<()>> {
130        self.descriptions_scheduler.initialize(filter, context)
131    }
132}
133
134pub struct BlobFieldDecoder {
135    io: Arc<dyn EncodingsIo>,
136    unloaded_descriptions: Option<BoxFuture<'static, Result<ArrayRef>>>,
137    positions: PrimitiveArray<UInt64Type>,
138    sizes: PrimitiveArray<UInt64Type>,
139    num_rows: u64,
140    loaded: VecDeque<Bytes>,
141    validity: VecDeque<BooleanBuffer>,
142    rows_loaded: u64,
143    rows_drained: u64,
144    base_priority: u64,
145}
146
147impl BlobFieldDecoder {
148    fn drain_validity(&mut self, num_values: usize) -> Result<Option<NullBuffer>> {
149        let mut validity = BooleanBufferBuilder::new(num_values);
150        let mut remaining = num_values;
151        while remaining > 0 {
152            let next = self.validity.front_mut().unwrap();
153            if remaining < next.len() {
154                let slice = next.slice(0, remaining);
155                validity.append_buffer(&slice);
156                *next = next.slice(remaining, next.len() - remaining);
157                remaining = 0;
158            } else {
159                validity.append_buffer(next);
160                remaining -= next.len();
161                self.validity.pop_front();
162            }
163        }
164        let nulls = NullBuffer::new(validity.finish());
165        if nulls.null_count() == 0 {
166            Ok(None)
167        } else {
168            Ok(Some(nulls))
169        }
170    }
171}
172
173impl std::fmt::Debug for BlobFieldDecoder {
174    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175        f.debug_struct("BlobFieldDecoder")
176            .field("num_rows", &self.num_rows)
177            .field("rows_loaded", &self.rows_loaded)
178            .field("rows_drained", &self.rows_drained)
179            .finish()
180    }
181}
182
183impl LogicalPageDecoder for BlobFieldDecoder {
184    fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<'_, Result<()>> {
185        async move {
186            if self.unloaded_descriptions.is_some() {
187                let descriptions = self.unloaded_descriptions.take().unwrap().await?;
188                let descriptions = descriptions.as_struct();
189                self.positions = descriptions.column(0).as_primitive().clone();
190                self.sizes = descriptions.column(1).as_primitive().clone();
191            }
192            let start = self.rows_loaded as usize;
193            let end = (loaded_need + 1).min(self.num_rows) as usize;
194            let positions = self.positions.values().slice(start, end - start);
195            let sizes = self.sizes.values().slice(start, end - start);
196            let ranges = positions
197                .iter()
198                .zip(sizes.iter())
199                .map(|(position, size)| *position..(*position + *size))
200                .collect::<Vec<_>>();
201            let validity = positions
202                .iter()
203                .zip(sizes.iter())
204                .map(|(p, s)| *p != 1 || *s != 0)
205                .collect::<BooleanBuffer>();
206            self.validity.push_back(validity);
207            self.rows_loaded = end as u64;
208            let bytes = self
209                .io
210                .submit_request(ranges, self.base_priority + start as u64)
211                .await?;
212            self.loaded.extend(bytes);
213            Ok(())
214        }
215        .boxed()
216    }
217
218    fn rows_loaded(&self) -> u64 {
219        self.rows_loaded
220    }
221
222    fn num_rows(&self) -> u64 {
223        self.num_rows
224    }
225
226    fn rows_drained(&self) -> u64 {
227        self.rows_drained
228    }
229
230    fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
231        let bytes = self.loaded.drain(0..num_rows as usize).collect::<Vec<_>>();
232        let validity = self.drain_validity(num_rows as usize)?;
233        self.rows_drained += num_rows;
234        Ok(NextDecodeTask {
235            num_rows,
236            task: Box::new(BlobArrayDecodeTask::new(bytes, validity)),
237        })
238    }
239
240    fn data_type(&self) -> &DataType {
241        &DataType::LargeBinary
242    }
243}
244
245struct BlobArrayDecodeTask {
246    bytes: Vec<Bytes>,
247    validity: Option<NullBuffer>,
248}
249
250impl BlobArrayDecodeTask {
251    fn new(bytes: Vec<Bytes>, validity: Option<NullBuffer>) -> Self {
252        Self { bytes, validity }
253    }
254}
255
256impl DecodeArrayTask for BlobArrayDecodeTask {
257    fn decode(self: Box<Self>) -> Result<(ArrayRef, u64)> {
258        let num_bytes = self.bytes.iter().map(|b| b.len()).sum::<usize>();
259        let offsets = self
260            .bytes
261            .iter()
262            .scan(0, |state, b| {
263                let start = *state;
264                *state += b.len();
265                Some(start as i64)
266            })
267            .chain(std::iter::once(num_bytes as i64))
268            .collect::<Vec<_>>();
269        let offsets = OffsetBuffer::new(ScalarBuffer::from(offsets));
270        let mut buffer = Vec::with_capacity(num_bytes);
271        for bytes in self.bytes {
272            buffer.extend_from_slice(&bytes);
273        }
274        let data_buf = Buffer::from_vec(buffer);
275        // data_size is only tracked in the v2.1 structural decode path; the legacy
276        // v2.0 path does not need it so we return 0.
277        Ok((
278            Arc::new(LargeBinaryArray::new(offsets, data_buf, self.validity)),
279            0,
280        ))
281    }
282}
283
284pub struct BlobFieldEncoder {
285    description_encoder: Box<dyn FieldEncoder>,
286}
287
288impl BlobFieldEncoder {
289    pub fn new(description_encoder: Box<dyn FieldEncoder>) -> Self {
290        Self {
291            description_encoder,
292        }
293    }
294
295    fn write_bins(array: ArrayRef, external_buffers: &mut OutOfLineBuffers) -> Result<ArrayRef> {
296        let binarray = array.as_binary_opt::<i64>().ok_or_else(|| {
297            Error::invalid_input_source(
298                format!("Expected large_binary and received {}", array.data_type()).into(),
299            )
300        })?;
301        let mut positions = Vec::with_capacity(array.len());
302        let mut sizes = Vec::with_capacity(array.len());
303        let data = binarray.values();
304        let nulls = binarray
305            .nulls()
306            .cloned()
307            .unwrap_or(NullBuffer::new_valid(binarray.len()));
308        for (w, is_valid) in binarray.value_offsets().windows(2).zip(nulls.into_iter()) {
309            if is_valid {
310                let start = w[0] as u64;
311                let end = w[1] as u64;
312                let size = end - start;
313                if size > 0 {
314                    let val = data.slice_with_length(start as usize, size as usize);
315                    let position = external_buffers.add_buffer(LanceBuffer::from(val));
316                    positions.push(position);
317                    sizes.push(size);
318                } else {
319                    // Empty values are always (0,0)
320                    positions.push(0);
321                    sizes.push(0);
322                }
323            } else {
324                // Null values are always (1, 0)
325                positions.push(1);
326                sizes.push(0);
327            }
328        }
329        let positions = Arc::new(UInt64Array::from(positions));
330        let sizes = Arc::new(UInt64Array::from(sizes));
331        let descriptions = Arc::new(StructArray::new(
332            BLOB_DESC_FIELDS.clone(),
333            vec![positions, sizes],
334            None,
335        ));
336        Ok(descriptions)
337    }
338}
339
340impl FieldEncoder for BlobFieldEncoder {
341    fn maybe_encode(
342        &mut self,
343        array: ArrayRef,
344        external_buffers: &mut OutOfLineBuffers,
345        repdef: RepDefBuilder,
346        row_number: u64,
347        num_rows: u64,
348    ) -> Result<Vec<EncodeTask>> {
349        let descriptions = Self::write_bins(array, external_buffers)?;
350        self.description_encoder.maybe_encode(
351            descriptions,
352            external_buffers,
353            repdef,
354            row_number,
355            num_rows,
356        )
357    }
358
359    // If there is any data left in the buffer then create an encode task from it
360    fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
361        self.description_encoder.flush(external_buffers)
362    }
363
364    fn num_columns(&self) -> u32 {
365        self.description_encoder.num_columns()
366    }
367
368    fn finish(
369        &mut self,
370        external_buffers: &mut OutOfLineBuffers,
371    ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
372        let inner_finished = self.description_encoder.finish(external_buffers);
373        async move {
374            let mut cols = inner_finished.await?;
375            assert_eq!(cols.len(), 1);
376            let encoding = std::mem::take(&mut cols[0].encoding);
377            let wrapped_encoding = ColumnEncoding {
378                column_encoding: Some(column_encoding::ColumnEncoding::Blob(Box::new(Blob {
379                    inner: Some(Box::new(encoding)),
380                }))),
381            };
382            cols[0].encoding = wrapped_encoding;
383            Ok(cols)
384        }
385        .boxed()
386    }
387}
388
389#[cfg(test)]
390mod tests {
391    use std::{
392        collections::HashMap,
393        sync::{Arc, LazyLock},
394    };
395
396    use arrow_array::LargeBinaryArray;
397    use arrow_schema::{DataType, Field};
398    use lance_arrow::BLOB_META_KEY;
399
400    use crate::{
401        format::pb::column_encoding,
402        testing::{TestCases, check_round_trip_encoding_of_data, check_specific_random},
403        version::LanceFileVersion,
404    };
405
406    static BLOB_META: LazyLock<HashMap<String, String>> = LazyLock::new(|| {
407        [(BLOB_META_KEY.to_string(), "true".to_string())]
408            .iter()
409            .cloned()
410            .collect::<HashMap<_, _>>()
411    });
412
413    #[test_log::test(tokio::test)]
414    async fn test_basic_blob() {
415        let field = Field::new("", DataType::LargeBinary, false).with_metadata(BLOB_META.clone());
416        check_specific_random(
417            field,
418            TestCases::basic().with_max_file_version(LanceFileVersion::V2_1),
419        )
420        .await;
421    }
422
423    #[test_log::test(tokio::test)]
424    async fn test_simple_blob() {
425        let val1: &[u8] = &[1, 2, 3];
426        let val2: &[u8] = &[7, 8, 9];
427        let array = Arc::new(LargeBinaryArray::from(vec![Some(val1), None, Some(val2)]));
428        let test_cases = TestCases::default()
429            .with_max_file_version(LanceFileVersion::V2_1)
430            .with_expected_encoding("packed_struct")
431            .with_verify_encoding(Arc::new(|cols, version| {
432                if version < &LanceFileVersion::V2_1 {
433                    // In 2.0 we used a special "column encoding" to mark blob fields.  In 2.1 we
434                    // don't do this and just rely on the regular page encoding.
435                    assert_eq!(cols.len(), 1);
436                    let col = &cols[0];
437                    assert!(matches!(
438                        col.encoding.column_encoding.as_ref().unwrap(),
439                        column_encoding::ColumnEncoding::Blob(_)
440                    ));
441                }
442            }));
443        // Use blob encoding if requested
444        check_round_trip_encoding_of_data(vec![array.clone()], &test_cases, BLOB_META.clone())
445            .await;
446
447        let test_cases = TestCases::default()
448            .with_min_file_version(LanceFileVersion::V2_1)
449            .with_verify_encoding(Arc::new(|cols, version| {
450                if version < &LanceFileVersion::V2_1 {
451                    assert_eq!(cols.len(), 1);
452                    let col = &cols[0];
453                    assert!(!matches!(
454                        col.encoding.column_encoding.as_ref().unwrap(),
455                        column_encoding::ColumnEncoding::Blob(_)
456                    ));
457                }
458            }));
459        // Don't use blob encoding if not requested
460        check_round_trip_encoding_of_data(vec![array], &test_cases, Default::default()).await;
461    }
462}