lance_encoding/v2/encodings/logical/
primitive.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{fmt::Debug, ops::Range, sync::Arc, vec};
5
6use arrow::array::AsArray;
7use arrow_array::{make_array, Array, ArrayRef};
8use arrow_buffer::bit_util;
9use arrow_schema::DataType;
10use futures::{future::BoxFuture, FutureExt};
11use log::trace;
12use snafu::location;
13
14use crate::decoder::{ColumnBuffers, PageBuffers};
15use crate::utils::accumulation::AccumulationQueue;
16use crate::v2::decoder::{FieldScheduler, LogicalPageDecoder, SchedulingJob};
17use crate::v2::encoder::ArrayEncodingStrategy;
18use crate::{data::DataBlock, v2::encodings::physical::decoder_from_array_encoding};
19use lance_core::{datatypes::Field, Error, Result};
20
21use crate::{
22    decoder::{
23        DecodeArrayTask, FilterExpression, MessageType, NextDecodeTask, PageEncoding, PageInfo,
24        PageScheduler, PrimitivePageDecoder, PriorityRange, ScheduledScanLine, SchedulerContext,
25    },
26    encoder::{
27        EncodeTask, EncodedColumn, EncodedPage, EncodingOptions, FieldEncoder, OutOfLineBuffers,
28    },
29    repdef::RepDefBuilder,
30};
31
32#[derive(Debug)]
33struct PrimitivePage {
34    scheduler: Box<dyn PageScheduler>,
35    num_rows: u64,
36    page_index: u32,
37}
38
39/// A field scheduler for primitive fields
40///
41/// This maps to exactly one column and it assumes that the top-level
42/// encoding of each page is "basic".  The basic encoding decodes into an
43/// optional buffer of validity and a fixed-width buffer of values
44/// which is exactly what we need to create a primitive array.
45///
46/// Note: we consider booleans and fixed-size-lists of primitive types to be
47/// primitive types.  This is slightly different than arrow-rs's definition
48#[derive(Debug)]
49pub struct PrimitiveFieldScheduler {
50    data_type: DataType,
51    page_schedulers: Vec<PrimitivePage>,
52    num_rows: u64,
53    should_validate: bool,
54    column_index: u32,
55}
56
57impl PrimitiveFieldScheduler {
58    pub fn new(
59        column_index: u32,
60        data_type: DataType,
61        pages: Arc<[PageInfo]>,
62        buffers: ColumnBuffers,
63        should_validate: bool,
64    ) -> Self {
65        let page_schedulers = pages
66            .iter()
67            .enumerate()
68            // Buggy versions of Lance could sometimes create empty pages
69            .filter(|(page_index, page)| {
70                log::trace!("Skipping empty page with index {}", page_index);
71                page.num_rows > 0
72            })
73            .map(|(page_index, page)| {
74                let page_buffers = PageBuffers {
75                    column_buffers: buffers,
76                    positions_and_sizes: &page.buffer_offsets_and_sizes,
77                };
78                let scheduler = decoder_from_array_encoding(
79                    page.encoding.as_legacy(),
80                    &page_buffers,
81                    &data_type,
82                );
83                PrimitivePage {
84                    scheduler,
85                    num_rows: page.num_rows,
86                    page_index: page_index as u32,
87                }
88            })
89            .collect::<Vec<_>>();
90        let num_rows = page_schedulers.iter().map(|p| p.num_rows).sum();
91        Self {
92            data_type,
93            page_schedulers,
94            num_rows,
95            should_validate,
96            column_index,
97        }
98    }
99}
100
101#[derive(Debug)]
102struct PrimitiveFieldSchedulingJob<'a> {
103    scheduler: &'a PrimitiveFieldScheduler,
104    ranges: Vec<Range<u64>>,
105    page_idx: usize,
106    range_idx: usize,
107    range_offset: u64,
108    global_row_offset: u64,
109}
110
111impl<'a> PrimitiveFieldSchedulingJob<'a> {
112    pub fn new(scheduler: &'a PrimitiveFieldScheduler, ranges: Vec<Range<u64>>) -> Self {
113        Self {
114            scheduler,
115            ranges,
116            page_idx: 0,
117            range_idx: 0,
118            range_offset: 0,
119            global_row_offset: 0,
120        }
121    }
122}
123
124impl SchedulingJob for PrimitiveFieldSchedulingJob<'_> {
125    fn schedule_next(
126        &mut self,
127        context: &mut SchedulerContext,
128        priority: &dyn PriorityRange,
129    ) -> Result<ScheduledScanLine> {
130        debug_assert!(self.range_idx < self.ranges.len());
131        // Get our current range
132        let mut range = self.ranges[self.range_idx].clone();
133        range.start += self.range_offset;
134
135        let mut cur_page = &self.scheduler.page_schedulers[self.page_idx];
136        trace!(
137            "Current range is {:?} and current page has {} rows",
138            range,
139            cur_page.num_rows
140        );
141        // Skip entire pages until we have some overlap with our next range
142        while cur_page.num_rows + self.global_row_offset <= range.start {
143            self.global_row_offset += cur_page.num_rows;
144            self.page_idx += 1;
145            trace!("Skipping entire page of {} rows", cur_page.num_rows);
146            cur_page = &self.scheduler.page_schedulers[self.page_idx];
147        }
148
149        // Now the cur_page has overlap with range.  Continue looping through ranges
150        // until we find a range that exceeds the current page
151
152        let mut ranges_in_page = Vec::new();
153        while cur_page.num_rows + self.global_row_offset > range.start {
154            range.start = range.start.max(self.global_row_offset);
155            let start_in_page = range.start - self.global_row_offset;
156            let end_in_page = start_in_page + (range.end - range.start);
157            let end_in_page = end_in_page.min(cur_page.num_rows);
158            let last_in_range = (end_in_page + self.global_row_offset) >= range.end;
159
160            ranges_in_page.push(start_in_page..end_in_page);
161            if last_in_range {
162                self.range_idx += 1;
163                if self.range_idx == self.ranges.len() {
164                    break;
165                }
166                range = self.ranges[self.range_idx].clone();
167            } else {
168                break;
169            }
170        }
171
172        let num_rows_in_next = ranges_in_page.iter().map(|r| r.end - r.start).sum();
173        trace!(
174            "Scheduling {} rows across {} ranges from page with {} rows (priority={}, column_index={}, page_index={})",
175            num_rows_in_next,
176            ranges_in_page.len(),
177            cur_page.num_rows,
178            priority.current_priority(),
179            self.scheduler.column_index,
180            cur_page.page_index,
181        );
182
183        self.global_row_offset += cur_page.num_rows;
184        self.page_idx += 1;
185
186        let physical_decoder = cur_page.scheduler.schedule_ranges(
187            &ranges_in_page,
188            context.io(),
189            priority.current_priority(),
190        );
191
192        let logical_decoder = PrimitiveFieldDecoder {
193            data_type: self.scheduler.data_type.clone(),
194            column_index: self.scheduler.column_index,
195            unloaded_physical_decoder: Some(physical_decoder),
196            physical_decoder: None,
197            rows_drained: 0,
198            num_rows: num_rows_in_next,
199            should_validate: self.scheduler.should_validate,
200            page_index: cur_page.page_index,
201        };
202
203        let decoder = Box::new(logical_decoder);
204        #[allow(deprecated)]
205        let decoder_ready = context.locate_decoder(decoder);
206        Ok(ScheduledScanLine {
207            decoders: vec![MessageType::DecoderReady(decoder_ready)],
208            rows_scheduled: num_rows_in_next,
209        })
210    }
211
212    fn num_rows(&self) -> u64 {
213        self.ranges.iter().map(|r| r.end - r.start).sum()
214    }
215}
216
217impl FieldScheduler for PrimitiveFieldScheduler {
218    fn num_rows(&self) -> u64 {
219        self.num_rows
220    }
221
222    fn schedule_ranges<'a>(
223        &'a self,
224        ranges: &[std::ops::Range<u64>],
225        // TODO: Could potentially use filter to simplify decode, something of a micro-optimization probably
226        _filter: &FilterExpression,
227    ) -> Result<Box<dyn SchedulingJob + 'a>> {
228        Ok(Box::new(PrimitiveFieldSchedulingJob::new(
229            self,
230            ranges.to_vec(),
231        )))
232    }
233
234    fn initialize<'a>(
235        &'a self,
236        _filter: &'a FilterExpression,
237        _context: &'a SchedulerContext,
238    ) -> BoxFuture<'a, Result<()>> {
239        // 2.0 schedulers do not need to initialize
240        std::future::ready(Ok(())).boxed()
241    }
242}
243
244pub struct PrimitiveFieldDecoder {
245    data_type: DataType,
246    unloaded_physical_decoder: Option<BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>>,
247    physical_decoder: Option<Arc<dyn PrimitivePageDecoder>>,
248    should_validate: bool,
249    num_rows: u64,
250    rows_drained: u64,
251    column_index: u32,
252    page_index: u32,
253}
254
255impl PrimitiveFieldDecoder {
256    pub fn new_from_data(
257        physical_decoder: Arc<dyn PrimitivePageDecoder>,
258        data_type: DataType,
259        num_rows: u64,
260        should_validate: bool,
261    ) -> Self {
262        Self {
263            data_type,
264            unloaded_physical_decoder: None,
265            physical_decoder: Some(physical_decoder),
266            should_validate,
267            num_rows,
268            rows_drained: 0,
269            column_index: u32::MAX,
270            page_index: u32::MAX,
271        }
272    }
273}
274
275impl Debug for PrimitiveFieldDecoder {
276    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
277        f.debug_struct("PrimitiveFieldDecoder")
278            .field("data_type", &self.data_type)
279            .field("num_rows", &self.num_rows)
280            .field("rows_drained", &self.rows_drained)
281            .finish()
282    }
283}
284
285struct PrimitiveFieldDecodeTask {
286    rows_to_skip: u64,
287    rows_to_take: u64,
288    should_validate: bool,
289    physical_decoder: Arc<dyn PrimitivePageDecoder>,
290    data_type: DataType,
291}
292
293impl DecodeArrayTask for PrimitiveFieldDecodeTask {
294    fn decode(self: Box<Self>) -> Result<ArrayRef> {
295        let block = self
296            .physical_decoder
297            .decode(self.rows_to_skip, self.rows_to_take)?;
298
299        let array = make_array(block.into_arrow(self.data_type.clone(), self.should_validate)?);
300
301        // This is a bit of a hack to work around https://github.com/apache/arrow-rs/issues/6302
302        //
303        // We change from nulls-in-dictionary (storage format) to nulls-in-indices (arrow-rs preferred
304        // format)
305        //
306        // The calculation of logical_nulls is not free and would be good to avoid in the future
307        if let DataType::Dictionary(_, _) = self.data_type {
308            let dict = array.as_any_dictionary();
309            if let Some(nulls) = array.logical_nulls() {
310                let new_indices = dict.keys().to_data();
311                let new_array = make_array(
312                    new_indices
313                        .into_builder()
314                        .nulls(Some(nulls))
315                        .add_child_data(dict.values().to_data())
316                        .data_type(dict.data_type().clone())
317                        .build()?,
318                );
319                return Ok(new_array);
320            }
321        }
322        Ok(array)
323    }
324}
325
326impl LogicalPageDecoder for PrimitiveFieldDecoder {
327    // TODO: In the future, at some point, we may consider partially waiting for primitive pages by
328    // breaking up large I/O into smaller I/O as a way to accelerate the "time-to-first-decode"
329    fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<Result<()>> {
330        log::trace!(
331            "primitive wait for more than {} rows on column {} and page {} (page has {} rows)",
332            loaded_need,
333            self.column_index,
334            self.page_index,
335            self.num_rows
336        );
337        async move {
338            let physical_decoder = self.unloaded_physical_decoder.take().unwrap().await?;
339            self.physical_decoder = Some(Arc::from(physical_decoder));
340            Ok(())
341        }
342        .boxed()
343    }
344
345    fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
346        if self.physical_decoder.as_ref().is_none() {
347            return Err(lance_core::Error::Internal {
348                message: format!("drain was called on primitive field decoder for data type {} on column {} but the decoder was never awaited", self.data_type, self.column_index),
349                location: location!(),
350            });
351        }
352
353        let rows_to_skip = self.rows_drained;
354        let rows_to_take = num_rows;
355
356        self.rows_drained += rows_to_take;
357
358        let task = Box::new(PrimitiveFieldDecodeTask {
359            rows_to_skip,
360            rows_to_take,
361            should_validate: self.should_validate,
362            physical_decoder: self.physical_decoder.as_ref().unwrap().clone(),
363            data_type: self.data_type.clone(),
364        });
365
366        Ok(NextDecodeTask {
367            task,
368            num_rows: rows_to_take,
369        })
370    }
371
372    fn rows_loaded(&self) -> u64 {
373        if self.unloaded_physical_decoder.is_some() {
374            0
375        } else {
376            self.num_rows
377        }
378    }
379
380    fn rows_drained(&self) -> u64 {
381        if self.unloaded_physical_decoder.is_some() {
382            0
383        } else {
384            self.rows_drained
385        }
386    }
387
388    fn num_rows(&self) -> u64 {
389        self.num_rows
390    }
391
392    fn data_type(&self) -> &DataType {
393        &self.data_type
394    }
395}
396
397pub struct PrimitiveFieldEncoder {
398    accumulation_queue: AccumulationQueue,
399    array_encoding_strategy: Arc<dyn ArrayEncodingStrategy>,
400    column_index: u32,
401    field: Field,
402    max_page_bytes: u64,
403}
404
405impl PrimitiveFieldEncoder {
406    pub fn try_new(
407        options: &EncodingOptions,
408        array_encoding_strategy: Arc<dyn ArrayEncodingStrategy>,
409        column_index: u32,
410        field: Field,
411    ) -> Result<Self> {
412        Ok(Self {
413            accumulation_queue: AccumulationQueue::new(
414                options.cache_bytes_per_column,
415                column_index,
416                options.keep_original_array,
417            ),
418            column_index,
419            max_page_bytes: options.max_page_bytes,
420            array_encoding_strategy,
421            field,
422        })
423    }
424
425    fn create_encode_task(&mut self, arrays: Vec<ArrayRef>) -> Result<EncodeTask> {
426        let encoder = self
427            .array_encoding_strategy
428            .create_array_encoder(&arrays, &self.field)?;
429        let column_idx = self.column_index;
430        let data_type = self.field.data_type();
431
432        Ok(tokio::task::spawn(async move {
433            let num_values = arrays.iter().map(|arr| arr.len() as u64).sum();
434            let data = DataBlock::from_arrays(&arrays, num_values);
435            let mut buffer_index = 0;
436            let array = encoder.encode(data, &data_type, &mut buffer_index)?;
437            let (data, description) = array.into_buffers();
438            Ok(EncodedPage {
439                data,
440                description: PageEncoding::Legacy(description),
441                num_rows: num_values,
442                column_idx,
443                row_number: 0, // legacy encoders do not use
444            })
445        })
446        .map(|res_res| {
447            res_res.unwrap_or_else(|err| {
448                Err(Error::Internal {
449                    message: format!("Encoding task failed with error: {:?}", err),
450                    location: location!(),
451                })
452            })
453        })
454        .boxed())
455    }
456
457    // Creates an encode task, consuming all buffered data
458    fn do_flush(&mut self, arrays: Vec<ArrayRef>) -> Result<Vec<EncodeTask>> {
459        if arrays.len() == 1 {
460            let array = arrays.into_iter().next().unwrap();
461            let size_bytes = array.get_buffer_memory_size();
462            let num_parts = bit_util::ceil(size_bytes, self.max_page_bytes as usize);
463            // Can't slice it finer than 1 page per row
464            let num_parts = num_parts.min(array.len());
465            if num_parts <= 1 {
466                // One part and it fits in a page
467                Ok(vec![self.create_encode_task(vec![array])?])
468            } else {
469                // One part and it needs to be sliced into multiple pages
470
471                // This isn't perfect (items in the array might not all have the same size)
472                // but it's a reasonable stab for now)
473                let mut tasks = Vec::with_capacity(num_parts);
474                let mut offset = 0;
475                let part_size = bit_util::ceil(array.len(), num_parts);
476                for _ in 0..num_parts {
477                    let avail = array.len() - offset;
478                    if avail == 0 {
479                        break;
480                    }
481                    let chunk_size = avail.min(part_size);
482                    let part = array.slice(offset, chunk_size);
483                    let task = self.create_encode_task(vec![part])?;
484                    tasks.push(task);
485                    offset += chunk_size;
486                }
487                Ok(tasks)
488            }
489        } else {
490            // Multiple parts that (presumably) all fit in a page
491            //
492            // TODO: Could check here if there are any jumbo parts in the mix that need splitting
493            Ok(vec![self.create_encode_task(arrays)?])
494        }
495    }
496}
497
498impl FieldEncoder for PrimitiveFieldEncoder {
499    // Buffers data, if there is enough to write a page then we create an encode task
500    fn maybe_encode(
501        &mut self,
502        array: ArrayRef,
503        _external_buffers: &mut OutOfLineBuffers,
504        _repdef: RepDefBuilder,
505        row_number: u64,
506        num_rows: u64,
507    ) -> Result<Vec<EncodeTask>> {
508        if let Some(arrays) = self.accumulation_queue.insert(array, row_number, num_rows) {
509            Ok(self.do_flush(arrays.0)?)
510        } else {
511            Ok(vec![])
512        }
513    }
514
515    // If there is any data left in the buffer then create an encode task from it
516    fn flush(&mut self, _external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
517        if let Some(arrays) = self.accumulation_queue.flush() {
518            Ok(self.do_flush(arrays.0)?)
519        } else {
520            Ok(vec![])
521        }
522    }
523
524    fn num_columns(&self) -> u32 {
525        1
526    }
527
528    fn finish(
529        &mut self,
530        _external_buffers: &mut OutOfLineBuffers,
531    ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
532        std::future::ready(Ok(vec![EncodedColumn::default()])).boxed()
533    }
534}