lance_encoding/encodings/logical/primitive/
blob.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Routines for decoding blob data
5//!
6//! The blob structural encoding is a structural encoding where the values (blobs) are stored
7//! out-of-line in the file.  The page contains the descriptions, encoded using some other layout.
8
9use std::{collections::VecDeque, ops::Range, sync::Arc};
10
11use arrow_array::{cast::AsArray, make_array, Array, UInt64Array};
12use bytes::Bytes;
13use futures::{future::BoxFuture, FutureExt};
14use snafu::location;
15
16use lance_core::{
17    cache::DeepSizeOf, datatypes::BLOB_DESC_TYPE, error::LanceOptionExt, Error, Result,
18};
19
20use crate::{
21    buffer::LanceBuffer,
22    data::{BlockInfo, DataBlock, VariableWidthBlock},
23    decoder::{DecodePageTask, DecodedPage, StructuralPageDecoder},
24    encodings::logical::primitive::{CachedPageData, PageLoadTask, StructuralPageScheduler},
25    repdef::{DefinitionInterpretation, RepDefUnraveler},
26    EncodingsIo,
27};
28
29/// How many bytes to target in each unloaded / loaded shard.  A larger value means
30/// we buffer more data in memory / make bigger requests to the I/O scheduler while
31/// a smaller value means more requests to the I/O scheduler.
32///
33/// This is probably a reasonable default for most cases.
34pub const TARGET_SHARD_SIZE: u64 = 32 * 1024 * 1024;
35
36#[derive(Debug)]
37pub(super) struct BlobDescriptionPageScheduler {
38    inner_scheduler: Box<dyn StructuralPageScheduler>,
39    def_meaning: Arc<[DefinitionInterpretation]>,
40}
41
42impl BlobDescriptionPageScheduler {
43    pub fn new(
44        inner_scheduler: Box<dyn StructuralPageScheduler>,
45        def_meaning: Arc<[DefinitionInterpretation]>,
46    ) -> Self {
47        Self {
48            inner_scheduler,
49            def_meaning,
50        }
51    }
52
53    fn wrap_decoder_fut(
54        decoder_fut: BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>,
55        def_meaning: Arc<[DefinitionInterpretation]>,
56    ) -> BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>> {
57        async move {
58            let decoder = decoder_fut.await?;
59            Ok(
60                Box::new(BlobDescriptionPageDecoder::new(decoder, def_meaning))
61                    as Box<dyn StructuralPageDecoder>,
62            )
63        }
64        .boxed()
65    }
66}
67
68impl StructuralPageScheduler for BlobDescriptionPageScheduler {
69    fn initialize<'a>(
70        &'a mut self,
71        io: &Arc<dyn EncodingsIo>,
72    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
73        self.inner_scheduler.initialize(io)
74    }
75
76    fn load(&mut self, data: &Arc<dyn CachedPageData>) {
77        self.inner_scheduler.load(data);
78    }
79
80    fn schedule_ranges(
81        &self,
82        ranges: &[Range<u64>],
83        io: &Arc<dyn EncodingsIo>,
84    ) -> Result<Vec<PageLoadTask>> {
85        let tasks = self.inner_scheduler.schedule_ranges(ranges, io)?;
86        Ok(tasks
87            .into_iter()
88            .map(|task| PageLoadTask {
89                decoder_fut: Self::wrap_decoder_fut(task.decoder_fut, self.def_meaning.clone()),
90                num_rows: task.num_rows,
91            })
92            .collect())
93    }
94}
95
96#[derive(Debug)]
97struct BlobDescriptionPageDecoder {
98    inner: Box<dyn StructuralPageDecoder>,
99    def_meaning: Arc<[DefinitionInterpretation]>,
100}
101
102impl BlobDescriptionPageDecoder {
103    fn new(
104        inner: Box<dyn StructuralPageDecoder>,
105        def_meaning: Arc<[DefinitionInterpretation]>,
106    ) -> Self {
107        Self { inner, def_meaning }
108    }
109}
110
111impl StructuralPageDecoder for BlobDescriptionPageDecoder {
112    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
113        Ok(Box::new(BlobDescriptionDecodePageTask::new(
114            self.inner.drain(num_rows)?,
115            self.def_meaning.clone(),
116        )))
117    }
118
119    fn num_rows(&self) -> u64 {
120        self.inner.num_rows()
121    }
122}
123
124#[derive(Debug)]
125struct BlobDescriptionDecodePageTask {
126    inner: Box<dyn DecodePageTask>,
127    def_meaning: Arc<[DefinitionInterpretation]>,
128}
129
130impl BlobDescriptionDecodePageTask {
131    fn new(inner: Box<dyn DecodePageTask>, def_meaning: Arc<[DefinitionInterpretation]>) -> Self {
132        Self { inner, def_meaning }
133    }
134}
135
136impl DecodePageTask for BlobDescriptionDecodePageTask {
137    fn decode(self: Box<Self>) -> Result<DecodedPage> {
138        let decoded = self.inner.decode()?;
139        let num_values = decoded.data.num_values();
140
141        // Need to extract out the repdef information
142        let DataBlock::Struct(descriptions) = &decoded.data else {
143            return Err(Error::Internal {
144                message: "Expected struct data block for descriptions".into(),
145                location: location!(),
146            });
147        };
148        let mut description_children = descriptions.children.iter();
149        let DataBlock::FixedWidth(positions) = description_children.next().expect_ok()? else {
150            return Err(Error::Internal {
151                message: "Expected fixed width data block for positions".into(),
152                location: location!(),
153            });
154        };
155        let DataBlock::FixedWidth(sizes) = description_children.next().expect_ok()? else {
156            return Err(Error::Internal {
157                message: "Expected fixed width data block for sizes".into(),
158                location: location!(),
159            });
160        };
161        let positions = positions.data.borrow_to_typed_slice::<u64>();
162        let sizes = sizes.data.borrow_to_typed_slice::<u64>();
163
164        let mut rep = Vec::with_capacity(num_values as usize);
165        let mut def = Vec::with_capacity(num_values as usize);
166
167        for (position, size) in positions.iter().copied().zip(sizes.iter().copied()) {
168            if size == 0 {
169                if position == 0 {
170                    rep.push(0);
171                    def.push(0);
172                } else {
173                    let repval = (position & 0xFFFF) as u16;
174                    let defval = ((position >> 16) & 0xFFFF) as u16;
175                    rep.push(repval);
176                    def.push(defval);
177                }
178            } else {
179                rep.push(0);
180                def.push(0);
181            }
182        }
183
184        let rep = if rep.iter().any(|r| *r != 0) {
185            Some(rep)
186        } else {
187            None
188        };
189        let def = if self.def_meaning.len() > 1
190            || self.def_meaning[0] != DefinitionInterpretation::AllValidItem
191        {
192            Some(def)
193        } else {
194            None
195        };
196
197        let repdef =
198            RepDefUnraveler::new(rep, def, self.def_meaning.clone(), positions.len() as u64);
199
200        Ok(DecodedPage {
201            data: decoded.data,
202            repdef,
203        })
204    }
205}
206
207struct BlobCacheableState {
208    positions: Arc<UInt64Array>,
209    sizes: Arc<UInt64Array>,
210    inner_state: Arc<dyn CachedPageData>,
211}
212
213impl DeepSizeOf for BlobCacheableState {
214    fn deep_size_of_children(&self, context: &mut lance_core::cache::Context) -> usize {
215        self.positions.get_array_memory_size()
216            + self.sizes.get_array_memory_size()
217            + self.inner_state.deep_size_of_children(context)
218    }
219}
220
221impl CachedPageData for BlobCacheableState {
222    fn as_arc_any(self: Arc<Self>) -> Arc<dyn std::any::Any + Send + Sync + 'static> {
223        self
224    }
225}
226
227#[derive(Debug)]
228pub(super) struct BlobPageScheduler {
229    inner_scheduler: Box<dyn StructuralPageScheduler>,
230    row_number: u64,
231    num_rows: u64,
232    def_meaning: Arc<[DefinitionInterpretation]>,
233    positions: Option<Arc<UInt64Array>>,
234    sizes: Option<Arc<UInt64Array>>,
235}
236
237impl BlobPageScheduler {
238    pub fn new(
239        inner_scheduler: Box<dyn StructuralPageScheduler>,
240        row_number: u64,
241        num_rows: u64,
242        def_meaning: Arc<[DefinitionInterpretation]>,
243    ) -> Self {
244        Self {
245            inner_scheduler,
246            row_number,
247            num_rows,
248            def_meaning,
249            positions: None,
250            sizes: None,
251        }
252    }
253
254    fn create_page_load_task(
255        ranges_to_read: Vec<Range<u64>>,
256        mut loaded_blobs: Vec<LoadedBlob>,
257        first_row_number: u64,
258        io: &dyn EncodingsIo,
259        def_meaning: Arc<[DefinitionInterpretation]>,
260    ) -> Result<PageLoadTask> {
261        let num_rows = loaded_blobs.len() as u64;
262        let read_fut = io.submit_request(ranges_to_read, first_row_number);
263        let decoder_fut = async move {
264            let bytes = read_fut.await?;
265            let mut bytes_iter = bytes.into_iter();
266            for blob in loaded_blobs.iter_mut() {
267                if blob.def == 0 {
268                    blob.set_bytes(bytes_iter.next().expect_ok()?);
269                }
270            }
271            debug_assert!(bytes_iter.next().is_none());
272            Ok(Box::new(BlobPageDecoder::new(loaded_blobs, def_meaning))
273                as Box<dyn StructuralPageDecoder>)
274        }
275        .boxed();
276        Ok(PageLoadTask {
277            decoder_fut,
278            num_rows,
279        })
280    }
281}
282
283impl StructuralPageScheduler for BlobPageScheduler {
284    fn initialize<'a>(
285        &'a mut self,
286        io: &Arc<dyn EncodingsIo>,
287    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
288        let io = io.clone();
289        let num_rows = self.num_rows;
290        async move {
291            let cached = self.inner_scheduler.initialize(&io).await?;
292            let mut desc_decoders = self.inner_scheduler.schedule_ranges(&[0..num_rows], &io)?;
293            if desc_decoders.len() != 1 {
294                // This can't happen yet today so being a little lazy but if it did happen we just
295                // need to concatenate the descriptions.  I'm guessing by then we might be doing something
296                // different than "load all descriptors in initialize" anyways.
297                return Err(Error::NotSupported {
298                    source: "Expected exactly one descriptor decoder".into(),
299                    location: location!(),
300                });
301            }
302            let desc_decoder_task = desc_decoders.pop().unwrap();
303            let mut desc_decoder = desc_decoder_task.decoder_fut.await?;
304
305            let descs = desc_decoder.drain(desc_decoder_task.num_rows)?;
306            let descs = descs.decode()?;
307            let descs = make_array(descs.data.into_arrow(BLOB_DESC_TYPE.clone(), true)?);
308            let descs = descs.as_struct();
309            let positions = Arc::new(
310                descs
311                    .column(0)
312                    .as_any()
313                    .downcast_ref::<UInt64Array>()
314                    .unwrap()
315                    .clone(),
316            );
317            let sizes = Arc::new(
318                descs
319                    .column(1)
320                    .as_any()
321                    .downcast_ref::<UInt64Array>()
322                    .unwrap()
323                    .clone(),
324            );
325            self.positions = Some(positions.clone());
326            self.sizes = Some(sizes.clone());
327            let state = Arc::new(BlobCacheableState {
328                inner_state: cached,
329                positions,
330                sizes,
331            });
332            Ok(state as Arc<dyn CachedPageData>)
333        }
334        .boxed()
335    }
336
337    fn load(&mut self, data: &Arc<dyn CachedPageData>) {
338        let blob_state = data
339            .clone()
340            .as_arc_any()
341            .downcast::<BlobCacheableState>()
342            .unwrap();
343        self.positions = Some(blob_state.positions.clone());
344        self.sizes = Some(blob_state.sizes.clone());
345        self.inner_scheduler.load(&blob_state.inner_state);
346    }
347
348    fn schedule_ranges(
349        &self,
350        ranges: &[Range<u64>],
351        io: &Arc<dyn EncodingsIo>,
352    ) -> Result<Vec<PageLoadTask>> {
353        let num_rows: u64 = ranges.iter().map(|r| r.end - r.start).sum();
354
355        let positions = self.positions.as_ref().expect_ok()?;
356        let sizes = self.sizes.as_ref().expect_ok()?;
357
358        let mut page_load_tasks = Vec::new();
359        let mut bytes_so_far = 0;
360        let mut ranges_to_read = Vec::with_capacity(num_rows as usize);
361        let mut loaded_blobs = Vec::with_capacity(num_rows as usize);
362        let mut first_row_number = None;
363        for range in ranges {
364            for row in range.start..range.end {
365                if first_row_number.is_none() {
366                    first_row_number = Some(row + self.row_number);
367                }
368                let position = positions.value(row as usize);
369                let size = sizes.value(row as usize);
370
371                if size == 0 {
372                    let rep = (position & 0xFFFF) as u16;
373                    let def = ((position >> 16) & 0xFFFF) as u16;
374                    loaded_blobs.push(LoadedBlob::new(rep, def));
375                } else {
376                    loaded_blobs.push(LoadedBlob::new(0, 0));
377                    ranges_to_read.push(position..(position + size));
378                    bytes_so_far += size;
379                }
380
381                if bytes_so_far >= TARGET_SHARD_SIZE {
382                    let page_load_task = Self::create_page_load_task(
383                        std::mem::take(&mut ranges_to_read),
384                        std::mem::take(&mut loaded_blobs),
385                        first_row_number.unwrap(),
386                        io.as_ref(),
387                        self.def_meaning.clone(),
388                    )?;
389                    page_load_tasks.push(page_load_task);
390                    bytes_so_far = 0;
391                    first_row_number = None;
392                }
393            }
394        }
395        if !loaded_blobs.is_empty() {
396            let page_load_task = Self::create_page_load_task(
397                std::mem::take(&mut ranges_to_read),
398                std::mem::take(&mut loaded_blobs),
399                first_row_number.unwrap(),
400                io.as_ref(),
401                self.def_meaning.clone(),
402            )?;
403            page_load_tasks.push(page_load_task);
404        }
405
406        Ok(page_load_tasks)
407    }
408}
409
410#[derive(Debug)]
411struct LoadedBlob {
412    bytes: Option<Bytes>,
413    rep: u16,
414    def: u16,
415}
416
417impl LoadedBlob {
418    fn new(rep: u16, def: u16) -> Self {
419        Self {
420            bytes: None,
421            rep,
422            def,
423        }
424    }
425
426    fn set_bytes(&mut self, bytes: Bytes) {
427        self.bytes = Some(bytes);
428    }
429}
430
431#[derive(Debug)]
432struct BlobPageDecoder {
433    blobs: VecDeque<LoadedBlob>,
434    def_meaning: Arc<[DefinitionInterpretation]>,
435    num_rows: u64,
436}
437
438impl BlobPageDecoder {
439    fn new(blobs: Vec<LoadedBlob>, def_meaning: Arc<[DefinitionInterpretation]>) -> Self {
440        Self {
441            num_rows: blobs.len() as u64,
442            blobs: blobs.into_iter().collect(),
443            def_meaning,
444        }
445    }
446}
447
448impl StructuralPageDecoder for BlobPageDecoder {
449    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
450        let blobs = self.blobs.drain(0..num_rows as usize).collect::<Vec<_>>();
451        Ok(Box::new(BlobDecodePageTask::new(
452            blobs,
453            self.def_meaning.clone(),
454        )))
455    }
456
457    fn num_rows(&self) -> u64 {
458        self.num_rows
459    }
460}
461
462#[derive(Debug)]
463struct BlobDecodePageTask {
464    blobs: Vec<LoadedBlob>,
465    def_meaning: Arc<[DefinitionInterpretation]>,
466}
467
468impl BlobDecodePageTask {
469    fn new(blobs: Vec<LoadedBlob>, def_meaning: Arc<[DefinitionInterpretation]>) -> Self {
470        Self { blobs, def_meaning }
471    }
472}
473
474impl DecodePageTask for BlobDecodePageTask {
475    fn decode(self: Box<Self>) -> Result<DecodedPage> {
476        let num_values = self.blobs.len() as u64;
477        let num_bytes = self
478            .blobs
479            .iter()
480            .filter_map(|b| b.bytes.as_ref())
481            .map(|b| b.len())
482            .sum::<usize>();
483        let mut buffer = Vec::with_capacity(num_bytes);
484        let mut offsets = Vec::with_capacity(num_values as usize + 1);
485        let mut rep = Vec::with_capacity(num_values as usize);
486        let mut def = Vec::with_capacity(num_values as usize);
487        offsets.push(0_u64);
488        for blob in self.blobs {
489            rep.push(blob.rep);
490            def.push(blob.def);
491            if let Some(bytes) = blob.bytes {
492                offsets.push(offsets.last().unwrap() + bytes.len() as u64);
493                buffer.extend_from_slice(&bytes);
494            } else {
495                // Null / emptyvalue
496                offsets.push(*offsets.last().unwrap());
497            }
498        }
499        let offsets = LanceBuffer::reinterpret_vec(offsets);
500        let data = LanceBuffer::from(buffer);
501        let data_block = DataBlock::VariableWidth(VariableWidthBlock {
502            data,
503            offsets,
504            bits_per_offset: 64,
505            num_values,
506            block_info: BlockInfo::new(),
507        });
508
509        let rep = if rep.iter().any(|r| *r != 0) {
510            Some(rep)
511        } else {
512            None
513        };
514        let def = if self.def_meaning.len() > 1
515            || self.def_meaning[0] != DefinitionInterpretation::AllValidItem
516        {
517            Some(def)
518        } else {
519            None
520        };
521
522        Ok(DecodedPage {
523            data: data_block,
524            repdef: RepDefUnraveler::new(rep, def, self.def_meaning, num_values),
525        })
526    }
527}