Skip to main content

lance_encoding/previous/encodings/logical/
blob.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{collections::VecDeque, sync::Arc, vec};
5
6use arrow_array::{
7    Array, ArrayRef, LargeBinaryArray, PrimitiveArray, StructArray, UInt64Array, cast::AsArray,
8    types::UInt64Type,
9};
10use arrow_buffer::{
11    BooleanBuffer, BooleanBufferBuilder, Buffer, NullBuffer, OffsetBuffer, ScalarBuffer,
12};
13use arrow_schema::DataType;
14use bytes::Bytes;
15use futures::{FutureExt, future::BoxFuture};
16
17use lance_core::{Error, Result, datatypes::BLOB_DESC_FIELDS};
18
19use crate::{
20    EncodingsIo,
21    buffer::LanceBuffer,
22    decoder::{
23        DecodeArrayTask, FilterExpression, MessageType, NextDecodeTask, PriorityRange,
24        ScheduledScanLine, SchedulerContext,
25    },
26    encoder::{EncodeTask, FieldEncoder, OutOfLineBuffers},
27    format::pb::{Blob, ColumnEncoding, column_encoding},
28    previous::decoder::{DecoderReady, FieldScheduler, LogicalPageDecoder, SchedulingJob},
29    repdef::RepDefBuilder,
30};
31
32/// A field scheduler for large binary data
33///
34/// Large binary data (1MiB+) can be inefficient if we store as a regular primitive.  We
35/// essentially end up with 1 page per row (or a few rows) and the overhead of the
36/// metadata can be significant.
37///
38/// At the same time the benefits of using pages (contiguous arrays) are pretty small since
39/// we can generally perform random access at these sizes without much penalty.
40///
41/// This encoder gives up the random access and stores the large binary data out of line.  This
42/// keeps the metadata small.
43#[derive(Debug)]
44pub struct BlobFieldScheduler {
45    descriptions_scheduler: Arc<dyn FieldScheduler>,
46}
47
48impl BlobFieldScheduler {
49    pub fn new(descriptions_scheduler: Arc<dyn FieldScheduler>) -> Self {
50        Self {
51            descriptions_scheduler,
52        }
53    }
54}
55
56#[derive(Debug)]
57struct BlobFieldSchedulingJob<'a> {
58    descriptions_job: Box<dyn SchedulingJob + 'a>,
59}
60
61impl SchedulingJob for BlobFieldSchedulingJob<'_> {
62    fn schedule_next(
63        &mut self,
64        context: &mut SchedulerContext,
65        priority: &dyn PriorityRange,
66    ) -> Result<ScheduledScanLine> {
67        let next_descriptions = self.descriptions_job.schedule_next(context, priority)?;
68        let mut priority = priority.current_priority();
69        let decoders = next_descriptions.decoders.into_iter().map(|decoder| {
70            let decoder = decoder.into_legacy();
71            let path = decoder.path;
72            let mut decoder = decoder.decoder;
73            let num_rows = decoder.num_rows();
74            let descriptions_fut = async move {
75                decoder
76                    .wait_for_loaded(decoder.num_rows() - 1)
77                    .await
78                    .unwrap();
79                let descriptions_task = decoder.drain(decoder.num_rows()).unwrap();
80                descriptions_task.task.decode().map(|(arr, _)| arr)
81            }
82            .boxed();
83            let decoder = Box::new(BlobFieldDecoder {
84                io: context.io().clone(),
85                unloaded_descriptions: Some(descriptions_fut),
86                positions: PrimitiveArray::<UInt64Type>::from_iter_values(vec![]),
87                sizes: PrimitiveArray::<UInt64Type>::from_iter_values(vec![]),
88                num_rows,
89                loaded: VecDeque::new(),
90                validity: VecDeque::new(),
91                rows_loaded: 0,
92                rows_drained: 0,
93                base_priority: priority,
94            });
95            priority += num_rows;
96            MessageType::DecoderReady(DecoderReady { decoder, path })
97        });
98        Ok(ScheduledScanLine {
99            decoders: decoders.collect(),
100            rows_scheduled: next_descriptions.rows_scheduled,
101        })
102    }
103
104    fn num_rows(&self) -> u64 {
105        self.descriptions_job.num_rows()
106    }
107}
108
109impl FieldScheduler for BlobFieldScheduler {
110    fn schedule_ranges<'a>(
111        &'a self,
112        ranges: &[std::ops::Range<u64>],
113        filter: &FilterExpression,
114    ) -> Result<Box<dyn SchedulingJob + 'a>> {
115        let descriptions_job = self
116            .descriptions_scheduler
117            .schedule_ranges(ranges, filter)?;
118        Ok(Box::new(BlobFieldSchedulingJob { descriptions_job }))
119    }
120
121    fn num_rows(&self) -> u64 {
122        self.descriptions_scheduler.num_rows()
123    }
124
125    fn initialize<'a>(
126        &'a self,
127        filter: &'a FilterExpression,
128        context: &'a SchedulerContext,
129    ) -> BoxFuture<'a, Result<()>> {
130        self.descriptions_scheduler.initialize(filter, context)
131    }
132}
133
134pub struct BlobFieldDecoder {
135    io: Arc<dyn EncodingsIo>,
136    unloaded_descriptions: Option<BoxFuture<'static, Result<ArrayRef>>>,
137    positions: PrimitiveArray<UInt64Type>,
138    sizes: PrimitiveArray<UInt64Type>,
139    num_rows: u64,
140    loaded: VecDeque<Bytes>,
141    validity: VecDeque<BooleanBuffer>,
142    rows_loaded: u64,
143    rows_drained: u64,
144    base_priority: u64,
145}
146
147impl BlobFieldDecoder {
148    fn drain_validity(&mut self, num_values: usize) -> Result<Option<NullBuffer>> {
149        let mut validity = BooleanBufferBuilder::new(num_values);
150        let mut remaining = num_values;
151        while remaining > 0 {
152            let next = self.validity.front_mut().unwrap();
153            if remaining < next.len() {
154                let slice = next.slice(0, remaining);
155                validity.append_buffer(&slice);
156                *next = next.slice(remaining, next.len() - remaining);
157                remaining = 0;
158            } else {
159                validity.append_buffer(next);
160                remaining -= next.len();
161                self.validity.pop_front();
162            }
163        }
164        let nulls = NullBuffer::new(validity.finish());
165        if nulls.null_count() == 0 {
166            Ok(None)
167        } else {
168            Ok(Some(nulls))
169        }
170    }
171}
172
173impl std::fmt::Debug for BlobFieldDecoder {
174    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175        f.debug_struct("BlobFieldDecoder")
176            .field("num_rows", &self.num_rows)
177            .field("rows_loaded", &self.rows_loaded)
178            .field("rows_drained", &self.rows_drained)
179            .finish()
180    }
181}
182
183impl LogicalPageDecoder for BlobFieldDecoder {
184    fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<'_, Result<()>> {
185        async move {
186            if self.unloaded_descriptions.is_some() {
187                let descriptions = self.unloaded_descriptions.take().unwrap().await?;
188                let descriptions = descriptions.as_struct();
189                self.positions = descriptions.column(0).as_primitive().clone();
190                self.sizes = descriptions.column(1).as_primitive().clone();
191            }
192            let start = self.rows_loaded as usize;
193            let end = (loaded_need + 1).min(self.num_rows) as usize;
194            let positions = self.positions.values().slice(start, end - start);
195            let sizes = self.sizes.values().slice(start, end - start);
196            let ranges = positions
197                .iter()
198                .zip(sizes.iter())
199                .map(|(position, size)| *position..(*position + *size))
200                .collect::<Vec<_>>();
201            let validity = positions
202                .iter()
203                .zip(sizes.iter())
204                .map(|(p, s)| *p != 1 || *s != 0)
205                .collect::<BooleanBuffer>();
206            // Run the I/O before mutating decoder state, so a failed load
207            // leaves the decoder untouched and `wait_for_loaded` is the single,
208            // clean point of failure.
209            let bytes = self
210                .io
211                .submit_request(ranges, self.base_priority + start as u64)
212                .await?;
213            self.validity.push_back(validity);
214            self.loaded.extend(bytes);
215            self.rows_loaded = end as u64;
216            Ok(())
217        }
218        .boxed()
219    }
220
221    fn rows_loaded(&self) -> u64 {
222        self.rows_loaded
223    }
224
225    fn num_rows(&self) -> u64 {
226        self.num_rows
227    }
228
229    fn rows_drained(&self) -> u64 {
230        self.rows_drained
231    }
232
233    fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
234        if num_rows as usize > self.loaded.len() {
235            // `loaded` is populated by `wait_for_loaded`; guard the
236            // load-before-drain contract so a violation surfaces as an error
237            // rather than an out-of-bounds drain panic.
238            return Err(Error::internal(format!(
239                "BlobFieldDecoder was asked to drain {num_rows} rows but only \
240                 {} are loaded",
241                self.loaded.len(),
242            )));
243        }
244        let bytes = self.loaded.drain(0..num_rows as usize).collect::<Vec<_>>();
245        let validity = self.drain_validity(num_rows as usize)?;
246        self.rows_drained += num_rows;
247        Ok(NextDecodeTask {
248            num_rows,
249            task: Box::new(BlobArrayDecodeTask::new(bytes, validity)),
250        })
251    }
252
253    fn data_type(&self) -> &DataType {
254        &DataType::LargeBinary
255    }
256}
257
258struct BlobArrayDecodeTask {
259    bytes: Vec<Bytes>,
260    validity: Option<NullBuffer>,
261}
262
263impl BlobArrayDecodeTask {
264    fn new(bytes: Vec<Bytes>, validity: Option<NullBuffer>) -> Self {
265        Self { bytes, validity }
266    }
267}
268
269impl DecodeArrayTask for BlobArrayDecodeTask {
270    fn decode(self: Box<Self>) -> Result<(ArrayRef, u64)> {
271        let num_bytes = self.bytes.iter().map(|b| b.len()).sum::<usize>();
272        let offsets = self
273            .bytes
274            .iter()
275            .scan(0, |state, b| {
276                let start = *state;
277                *state += b.len();
278                Some(start as i64)
279            })
280            .chain(std::iter::once(num_bytes as i64))
281            .collect::<Vec<_>>();
282        let offsets = OffsetBuffer::new(ScalarBuffer::from(offsets));
283        let mut buffer = Vec::with_capacity(num_bytes);
284        for bytes in self.bytes {
285            buffer.extend_from_slice(&bytes);
286        }
287        let data_buf = Buffer::from_vec(buffer);
288        // data_size is only tracked in the v2.1 structural decode path; the legacy
289        // v2.0 path does not need it so we return 0.
290        Ok((
291            Arc::new(LargeBinaryArray::new(offsets, data_buf, self.validity)),
292            0,
293        ))
294    }
295}
296
297pub struct BlobFieldEncoder {
298    description_encoder: Box<dyn FieldEncoder>,
299}
300
301impl BlobFieldEncoder {
302    pub fn new(description_encoder: Box<dyn FieldEncoder>) -> Self {
303        Self {
304            description_encoder,
305        }
306    }
307
308    fn write_bins(array: ArrayRef, external_buffers: &mut OutOfLineBuffers) -> Result<ArrayRef> {
309        let binarray = array.as_binary_opt::<i64>().ok_or_else(|| {
310            Error::invalid_input_source(
311                format!("Expected large_binary and received {}", array.data_type()).into(),
312            )
313        })?;
314        let mut positions = Vec::with_capacity(array.len());
315        let mut sizes = Vec::with_capacity(array.len());
316        let data = binarray.values();
317        let nulls = binarray
318            .nulls()
319            .cloned()
320            .unwrap_or(NullBuffer::new_valid(binarray.len()));
321        for (w, is_valid) in binarray.value_offsets().windows(2).zip(nulls.into_iter()) {
322            if is_valid {
323                let start = w[0] as u64;
324                let end = w[1] as u64;
325                let size = end - start;
326                if size > 0 {
327                    let val = data.slice_with_length(start as usize, size as usize);
328                    let position = external_buffers.add_buffer(LanceBuffer::from(val));
329                    positions.push(position);
330                    sizes.push(size);
331                } else {
332                    // Empty values are always (0,0)
333                    positions.push(0);
334                    sizes.push(0);
335                }
336            } else {
337                // Null values are always (1, 0)
338                positions.push(1);
339                sizes.push(0);
340            }
341        }
342        let positions = Arc::new(UInt64Array::from(positions));
343        let sizes = Arc::new(UInt64Array::from(sizes));
344        let descriptions = Arc::new(StructArray::new(
345            BLOB_DESC_FIELDS.clone(),
346            vec![positions, sizes],
347            None,
348        ));
349        Ok(descriptions)
350    }
351}
352
353impl FieldEncoder for BlobFieldEncoder {
354    fn maybe_encode(
355        &mut self,
356        array: ArrayRef,
357        external_buffers: &mut OutOfLineBuffers,
358        repdef: RepDefBuilder,
359        row_number: u64,
360        num_rows: u64,
361    ) -> Result<Vec<EncodeTask>> {
362        let descriptions = Self::write_bins(array, external_buffers)?;
363        self.description_encoder.maybe_encode(
364            descriptions,
365            external_buffers,
366            repdef,
367            row_number,
368            num_rows,
369        )
370    }
371
372    // If there is any data left in the buffer then create an encode task from it
373    fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
374        self.description_encoder.flush(external_buffers)
375    }
376
377    fn num_columns(&self) -> u32 {
378        self.description_encoder.num_columns()
379    }
380
381    fn finish(
382        &mut self,
383        external_buffers: &mut OutOfLineBuffers,
384    ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
385        let inner_finished = self.description_encoder.finish(external_buffers);
386        async move {
387            let mut cols = inner_finished.await?;
388            assert_eq!(cols.len(), 1);
389            let encoding = std::mem::take(&mut cols[0].encoding);
390            let wrapped_encoding = ColumnEncoding {
391                column_encoding: Some(column_encoding::ColumnEncoding::Blob(Box::new(Blob {
392                    inner: Some(Box::new(encoding)),
393                }))),
394            };
395            cols[0].encoding = wrapped_encoding;
396            Ok(cols)
397        }
398        .boxed()
399    }
400}
401
402#[cfg(test)]
403mod tests {
404    use std::{
405        collections::{HashMap, VecDeque},
406        ops::Range,
407        sync::{Arc, LazyLock},
408    };
409
410    use arrow_array::{LargeBinaryArray, PrimitiveArray, types::UInt64Type};
411    use arrow_schema::{DataType, Field};
412    use bytes::Bytes;
413    use futures::{FutureExt, future::BoxFuture};
414    use lance_arrow::BLOB_META_KEY;
415    use lance_core::{Error, Result};
416
417    use super::BlobFieldDecoder;
418    use crate::{
419        EncodingsIo,
420        format::pb::column_encoding,
421        previous::decoder::LogicalPageDecoder,
422        testing::{TestCases, check_round_trip_encoding_of_data, check_specific_random},
423        version::LanceFileVersion,
424    };
425
426    static BLOB_META: LazyLock<HashMap<String, String>> = LazyLock::new(|| {
427        [(BLOB_META_KEY.to_string(), "true".to_string())]
428            .iter()
429            .cloned()
430            .collect::<HashMap<_, _>>()
431    });
432
433    #[test_log::test(tokio::test)]
434    async fn test_basic_blob() {
435        let field = Field::new("", DataType::LargeBinary, false).with_metadata(BLOB_META.clone());
436        check_specific_random(
437            field,
438            TestCases::basic().with_max_file_version(LanceFileVersion::V2_1),
439        )
440        .await;
441    }
442
443    #[test_log::test(tokio::test)]
444    async fn test_simple_blob() {
445        let val1: &[u8] = &[1, 2, 3];
446        let val2: &[u8] = &[7, 8, 9];
447        let array = Arc::new(LargeBinaryArray::from(vec![Some(val1), None, Some(val2)]));
448        let test_cases = TestCases::default()
449            .with_max_file_version(LanceFileVersion::V2_1)
450            .with_expected_encoding("packed_struct")
451            .with_verify_encoding(Arc::new(|cols, version| {
452                if version < &LanceFileVersion::V2_1 {
453                    // In 2.0 we used a special "column encoding" to mark blob fields.  In 2.1 we
454                    // don't do this and just rely on the regular page encoding.
455                    assert_eq!(cols.len(), 1);
456                    let col = &cols[0];
457                    assert!(matches!(
458                        col.encoding.column_encoding.as_ref().unwrap(),
459                        column_encoding::ColumnEncoding::Blob(_)
460                    ));
461                }
462            }));
463        // Use blob encoding if requested
464        check_round_trip_encoding_of_data(vec![array.clone()], &test_cases, BLOB_META.clone())
465            .await;
466
467        let test_cases = TestCases::default()
468            .with_min_file_version(LanceFileVersion::V2_1)
469            .with_verify_encoding(Arc::new(|cols, version| {
470                if version < &LanceFileVersion::V2_1 {
471                    assert_eq!(cols.len(), 1);
472                    let col = &cols[0];
473                    assert!(!matches!(
474                        col.encoding.column_encoding.as_ref().unwrap(),
475                        column_encoding::ColumnEncoding::Blob(_)
476                    ));
477                }
478            }));
479        // Don't use blob encoding if not requested
480        check_round_trip_encoding_of_data(vec![array], &test_cases, Default::default()).await;
481    }
482
483    /// An `EncodingsIo` that rejects every request, simulating cloud storage
484    /// returning a retryable error (e.g. an exhausted HTTP 503 retry budget).
485    #[derive(Debug)]
486    struct FailingScheduler;
487
488    impl EncodingsIo for FailingScheduler {
489        fn submit_request(
490            &self,
491            _ranges: Vec<Range<u64>>,
492            _priority: u64,
493        ) -> BoxFuture<'static, Result<Vec<Bytes>>> {
494            std::future::ready(Err(Error::io("simulated HTTP 503 from cloud storage"))).boxed()
495        }
496    }
497
498    /// A failed blob load must surface through `wait_for_loaded` and leave the
499    /// decoder untouched -- never half-advanced into a state where a later
500    /// `drain` reads rows that never loaded (which panicked with "range end
501    /// index N out of range for slice of length 0").
502    #[test_log::test(tokio::test)]
503    async fn test_io_failure_leaves_blob_decoder_consistent() {
504        let num_rows = 8u64;
505        // `positions`/`sizes` only need `num_rows` entries; the failing
506        // scheduler rejects the request regardless of the ranges it is given.
507        let descs = PrimitiveArray::<UInt64Type>::from_iter_values(std::iter::repeat_n(
508            0u64,
509            num_rows as usize,
510        ));
511
512        let mut decoder = BlobFieldDecoder {
513            io: Arc::new(FailingScheduler),
514            unloaded_descriptions: None,
515            positions: descs.clone(),
516            sizes: descs,
517            num_rows,
518            loaded: VecDeque::new(),
519            validity: VecDeque::new(),
520            rows_loaded: 0,
521            rows_drained: 0,
522            base_priority: 0,
523        };
524
525        // `wait_for_loaded` propagates the I/O failure...
526        assert!(decoder.wait_for_loaded(num_rows - 1).await.is_err());
527
528        // ...and leaves no half-loaded state behind.
529        assert_eq!(decoder.rows_loaded, 0);
530        assert!(decoder.loaded.is_empty());
531        assert!(decoder.validity.is_empty());
532
533        // A drain in this state errors instead of panicking on the empty buffer.
534        assert!(decoder.drain(num_rows).is_err());
535    }
536}