lance_encoding/previous/encodings/logical/
primitive.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{fmt::Debug, ops::Range, sync::Arc, vec};
5
6use arrow_array::{cast::AsArray, make_array, Array, ArrayRef};
7use arrow_buffer::bit_util;
8use arrow_schema::DataType;
9use futures::{future::BoxFuture, FutureExt};
10use log::trace;
11use snafu::location;
12
13use crate::decoder::{ColumnBuffers, PageBuffers};
14use crate::previous::decoder::{FieldScheduler, LogicalPageDecoder, SchedulingJob};
15use crate::previous::encoder::ArrayEncodingStrategy;
16use crate::utils::accumulation::AccumulationQueue;
17use crate::{data::DataBlock, previous::encodings::physical::decoder_from_array_encoding};
18use lance_core::{datatypes::Field, Error, Result};
19
20use crate::{
21    decoder::{
22        DecodeArrayTask, FilterExpression, MessageType, NextDecodeTask, PageEncoding, PageInfo,
23        PageScheduler, PrimitivePageDecoder, PriorityRange, ScheduledScanLine, SchedulerContext,
24    },
25    encoder::{
26        EncodeTask, EncodedColumn, EncodedPage, EncodingOptions, FieldEncoder, OutOfLineBuffers,
27    },
28    repdef::RepDefBuilder,
29};
30
31#[derive(Debug)]
32struct PrimitivePage {
33    scheduler: Box<dyn PageScheduler>,
34    num_rows: u64,
35    page_index: u32,
36}
37
38/// A field scheduler for primitive fields
39///
40/// This maps to exactly one column and it assumes that the top-level
41/// encoding of each page is "basic".  The basic encoding decodes into an
42/// optional buffer of validity and a fixed-width buffer of values
43/// which is exactly what we need to create a primitive array.
44///
45/// Note: we consider booleans and fixed-size-lists of primitive types to be
46/// primitive types.  This is slightly different than arrow-rs's definition
47#[derive(Debug)]
48pub struct PrimitiveFieldScheduler {
49    data_type: DataType,
50    page_schedulers: Vec<PrimitivePage>,
51    num_rows: u64,
52    should_validate: bool,
53    column_index: u32,
54}
55
56impl PrimitiveFieldScheduler {
57    pub fn new(
58        column_index: u32,
59        data_type: DataType,
60        pages: Arc<[PageInfo]>,
61        buffers: ColumnBuffers,
62        should_validate: bool,
63    ) -> Self {
64        let page_schedulers = pages
65            .iter()
66            .enumerate()
67            // Buggy versions of Lance could sometimes create empty pages
68            .filter(|(page_index, page)| {
69                log::trace!("Skipping empty page with index {}", page_index);
70                page.num_rows > 0
71            })
72            .map(|(page_index, page)| {
73                let page_buffers = PageBuffers {
74                    column_buffers: buffers,
75                    positions_and_sizes: &page.buffer_offsets_and_sizes,
76                };
77                let scheduler = decoder_from_array_encoding(
78                    page.encoding.as_legacy(),
79                    &page_buffers,
80                    &data_type,
81                );
82                PrimitivePage {
83                    scheduler,
84                    num_rows: page.num_rows,
85                    page_index: page_index as u32,
86                }
87            })
88            .collect::<Vec<_>>();
89        let num_rows = page_schedulers.iter().map(|p| p.num_rows).sum();
90        Self {
91            data_type,
92            page_schedulers,
93            num_rows,
94            should_validate,
95            column_index,
96        }
97    }
98}
99
100#[derive(Debug)]
101struct PrimitiveFieldSchedulingJob<'a> {
102    scheduler: &'a PrimitiveFieldScheduler,
103    ranges: Vec<Range<u64>>,
104    page_idx: usize,
105    range_idx: usize,
106    range_offset: u64,
107    global_row_offset: u64,
108}
109
110impl<'a> PrimitiveFieldSchedulingJob<'a> {
111    pub fn new(scheduler: &'a PrimitiveFieldScheduler, ranges: Vec<Range<u64>>) -> Self {
112        Self {
113            scheduler,
114            ranges,
115            page_idx: 0,
116            range_idx: 0,
117            range_offset: 0,
118            global_row_offset: 0,
119        }
120    }
121}
122
123impl SchedulingJob for PrimitiveFieldSchedulingJob<'_> {
124    fn schedule_next(
125        &mut self,
126        context: &mut SchedulerContext,
127        priority: &dyn PriorityRange,
128    ) -> Result<ScheduledScanLine> {
129        debug_assert!(self.range_idx < self.ranges.len());
130        // Get our current range
131        let mut range = self.ranges[self.range_idx].clone();
132        range.start += self.range_offset;
133
134        let mut cur_page = &self.scheduler.page_schedulers[self.page_idx];
135        trace!(
136            "Current range is {:?} and current page has {} rows",
137            range,
138            cur_page.num_rows
139        );
140        // Skip entire pages until we have some overlap with our next range
141        while cur_page.num_rows + self.global_row_offset <= range.start {
142            self.global_row_offset += cur_page.num_rows;
143            self.page_idx += 1;
144            trace!("Skipping entire page of {} rows", cur_page.num_rows);
145            cur_page = &self.scheduler.page_schedulers[self.page_idx];
146        }
147
148        // Now the cur_page has overlap with range.  Continue looping through ranges
149        // until we find a range that exceeds the current page
150
151        let mut ranges_in_page = Vec::new();
152        while cur_page.num_rows + self.global_row_offset > range.start {
153            range.start = range.start.max(self.global_row_offset);
154            let start_in_page = range.start - self.global_row_offset;
155            let end_in_page = start_in_page + (range.end - range.start);
156            let end_in_page = end_in_page.min(cur_page.num_rows);
157            let last_in_range = (end_in_page + self.global_row_offset) >= range.end;
158
159            ranges_in_page.push(start_in_page..end_in_page);
160            if last_in_range {
161                self.range_idx += 1;
162                if self.range_idx == self.ranges.len() {
163                    break;
164                }
165                range = self.ranges[self.range_idx].clone();
166            } else {
167                break;
168            }
169        }
170
171        let num_rows_in_next = ranges_in_page.iter().map(|r| r.end - r.start).sum();
172        trace!(
173            "Scheduling {} rows across {} ranges from page with {} rows (priority={}, column_index={}, page_index={})",
174            num_rows_in_next,
175            ranges_in_page.len(),
176            cur_page.num_rows,
177            priority.current_priority(),
178            self.scheduler.column_index,
179            cur_page.page_index,
180        );
181
182        self.global_row_offset += cur_page.num_rows;
183        self.page_idx += 1;
184
185        let physical_decoder = cur_page.scheduler.schedule_ranges(
186            &ranges_in_page,
187            context.io(),
188            priority.current_priority(),
189        );
190
191        let logical_decoder = PrimitiveFieldDecoder {
192            data_type: self.scheduler.data_type.clone(),
193            column_index: self.scheduler.column_index,
194            unloaded_physical_decoder: Some(physical_decoder),
195            physical_decoder: None,
196            rows_drained: 0,
197            num_rows: num_rows_in_next,
198            should_validate: self.scheduler.should_validate,
199            page_index: cur_page.page_index,
200        };
201
202        let decoder = Box::new(logical_decoder);
203        #[allow(deprecated)]
204        let decoder_ready = context.locate_decoder(decoder);
205        Ok(ScheduledScanLine {
206            decoders: vec![MessageType::DecoderReady(decoder_ready)],
207            rows_scheduled: num_rows_in_next,
208        })
209    }
210
211    fn num_rows(&self) -> u64 {
212        self.ranges.iter().map(|r| r.end - r.start).sum()
213    }
214}
215
216impl FieldScheduler for PrimitiveFieldScheduler {
217    fn num_rows(&self) -> u64 {
218        self.num_rows
219    }
220
221    fn schedule_ranges<'a>(
222        &'a self,
223        ranges: &[std::ops::Range<u64>],
224        // TODO: Could potentially use filter to simplify decode, something of a micro-optimization probably
225        _filter: &FilterExpression,
226    ) -> Result<Box<dyn SchedulingJob + 'a>> {
227        Ok(Box::new(PrimitiveFieldSchedulingJob::new(
228            self,
229            ranges.to_vec(),
230        )))
231    }
232
233    fn initialize<'a>(
234        &'a self,
235        _filter: &'a FilterExpression,
236        _context: &'a SchedulerContext,
237    ) -> BoxFuture<'a, Result<()>> {
238        // 2.0 schedulers do not need to initialize
239        std::future::ready(Ok(())).boxed()
240    }
241}
242
243pub struct PrimitiveFieldDecoder {
244    data_type: DataType,
245    unloaded_physical_decoder: Option<BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>>,
246    physical_decoder: Option<Arc<dyn PrimitivePageDecoder>>,
247    should_validate: bool,
248    num_rows: u64,
249    rows_drained: u64,
250    column_index: u32,
251    page_index: u32,
252}
253
254impl PrimitiveFieldDecoder {
255    pub fn new_from_data(
256        physical_decoder: Arc<dyn PrimitivePageDecoder>,
257        data_type: DataType,
258        num_rows: u64,
259        should_validate: bool,
260    ) -> Self {
261        Self {
262            data_type,
263            unloaded_physical_decoder: None,
264            physical_decoder: Some(physical_decoder),
265            should_validate,
266            num_rows,
267            rows_drained: 0,
268            column_index: u32::MAX,
269            page_index: u32::MAX,
270        }
271    }
272}
273
274impl Debug for PrimitiveFieldDecoder {
275    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276        f.debug_struct("PrimitiveFieldDecoder")
277            .field("data_type", &self.data_type)
278            .field("num_rows", &self.num_rows)
279            .field("rows_drained", &self.rows_drained)
280            .finish()
281    }
282}
283
284struct PrimitiveFieldDecodeTask {
285    rows_to_skip: u64,
286    rows_to_take: u64,
287    should_validate: bool,
288    physical_decoder: Arc<dyn PrimitivePageDecoder>,
289    data_type: DataType,
290}
291
292impl DecodeArrayTask for PrimitiveFieldDecodeTask {
293    fn decode(self: Box<Self>) -> Result<ArrayRef> {
294        let block = self
295            .physical_decoder
296            .decode(self.rows_to_skip, self.rows_to_take)?;
297
298        let array = make_array(block.into_arrow(self.data_type.clone(), self.should_validate)?);
299
300        // This is a bit of a hack to work around https://github.com/apache/arrow-rs/issues/6302
301        //
302        // We change from nulls-in-dictionary (storage format) to nulls-in-indices (arrow-rs preferred
303        // format)
304        //
305        // The calculation of logical_nulls is not free and would be good to avoid in the future
306        if let DataType::Dictionary(_, _) = self.data_type {
307            let dict = array.as_any_dictionary();
308            if let Some(nulls) = array.logical_nulls() {
309                let new_indices = dict.keys().to_data();
310                let new_array = make_array(
311                    new_indices
312                        .into_builder()
313                        .nulls(Some(nulls))
314                        .add_child_data(dict.values().to_data())
315                        .data_type(dict.data_type().clone())
316                        .build()?,
317                );
318                return Ok(new_array);
319            }
320        }
321        Ok(array)
322    }
323}
324
325impl LogicalPageDecoder for PrimitiveFieldDecoder {
326    // TODO: In the future, at some point, we may consider partially waiting for primitive pages by
327    // breaking up large I/O into smaller I/O as a way to accelerate the "time-to-first-decode"
328    fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<'_, Result<()>> {
329        log::trace!(
330            "primitive wait for more than {} rows on column {} and page {} (page has {} rows)",
331            loaded_need,
332            self.column_index,
333            self.page_index,
334            self.num_rows
335        );
336        async move {
337            let physical_decoder = self.unloaded_physical_decoder.take().unwrap().await?;
338            self.physical_decoder = Some(Arc::from(physical_decoder));
339            Ok(())
340        }
341        .boxed()
342    }
343
344    fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
345        if self.physical_decoder.as_ref().is_none() {
346            return Err(lance_core::Error::Internal {
347                message: format!("drain was called on primitive field decoder for data type {} on column {} but the decoder was never awaited", self.data_type, self.column_index),
348                location: location!(),
349            });
350        }
351
352        let rows_to_skip = self.rows_drained;
353        let rows_to_take = num_rows;
354
355        self.rows_drained += rows_to_take;
356
357        let task = Box::new(PrimitiveFieldDecodeTask {
358            rows_to_skip,
359            rows_to_take,
360            should_validate: self.should_validate,
361            physical_decoder: self.physical_decoder.as_ref().unwrap().clone(),
362            data_type: self.data_type.clone(),
363        });
364
365        Ok(NextDecodeTask {
366            task,
367            num_rows: rows_to_take,
368        })
369    }
370
371    fn rows_loaded(&self) -> u64 {
372        if self.unloaded_physical_decoder.is_some() {
373            0
374        } else {
375            self.num_rows
376        }
377    }
378
379    fn rows_drained(&self) -> u64 {
380        if self.unloaded_physical_decoder.is_some() {
381            0
382        } else {
383            self.rows_drained
384        }
385    }
386
387    fn num_rows(&self) -> u64 {
388        self.num_rows
389    }
390
391    fn data_type(&self) -> &DataType {
392        &self.data_type
393    }
394}
395
396pub struct PrimitiveFieldEncoder {
397    accumulation_queue: AccumulationQueue,
398    array_encoding_strategy: Arc<dyn ArrayEncodingStrategy>,
399    column_index: u32,
400    field: Field,
401    max_page_bytes: u64,
402}
403
404impl PrimitiveFieldEncoder {
405    pub fn try_new(
406        options: &EncodingOptions,
407        array_encoding_strategy: Arc<dyn ArrayEncodingStrategy>,
408        column_index: u32,
409        field: Field,
410    ) -> Result<Self> {
411        Ok(Self {
412            accumulation_queue: AccumulationQueue::new(
413                options.cache_bytes_per_column,
414                column_index,
415                options.keep_original_array,
416            ),
417            column_index,
418            max_page_bytes: options.max_page_bytes,
419            array_encoding_strategy,
420            field,
421        })
422    }
423
424    fn create_encode_task(&mut self, arrays: Vec<ArrayRef>) -> Result<EncodeTask> {
425        let encoder = self
426            .array_encoding_strategy
427            .create_array_encoder(&arrays, &self.field)?;
428        let column_idx = self.column_index;
429        let data_type = self.field.data_type();
430
431        Ok(tokio::task::spawn(async move {
432            let num_values = arrays.iter().map(|arr| arr.len() as u64).sum();
433            let data = DataBlock::from_arrays(&arrays, num_values);
434            let mut buffer_index = 0;
435            let array = encoder.encode(data, &data_type, &mut buffer_index)?;
436            let (data, description) = array.into_buffers();
437            Ok(EncodedPage {
438                data,
439                description: PageEncoding::Legacy(description),
440                num_rows: num_values,
441                column_idx,
442                row_number: 0, // legacy encoders do not use
443            })
444        })
445        .map(|res_res| {
446            res_res.unwrap_or_else(|err| {
447                Err(Error::Internal {
448                    message: format!("Encoding task failed with error: {:?}", err),
449                    location: location!(),
450                })
451            })
452        })
453        .boxed())
454    }
455
456    // Creates an encode task, consuming all buffered data
457    fn do_flush(&mut self, arrays: Vec<ArrayRef>) -> Result<Vec<EncodeTask>> {
458        if arrays.len() == 1 {
459            let array = arrays.into_iter().next().unwrap();
460            let size_bytes = array.get_buffer_memory_size();
461            let num_parts = bit_util::ceil(size_bytes, self.max_page_bytes as usize);
462            // Can't slice it finer than 1 page per row
463            let num_parts = num_parts.min(array.len());
464            if num_parts <= 1 {
465                // One part and it fits in a page
466                Ok(vec![self.create_encode_task(vec![array])?])
467            } else {
468                // One part and it needs to be sliced into multiple pages
469
470                // This isn't perfect (items in the array might not all have the same size)
471                // but it's a reasonable stab for now)
472                let mut tasks = Vec::with_capacity(num_parts);
473                let mut offset = 0;
474                let part_size = bit_util::ceil(array.len(), num_parts);
475                for _ in 0..num_parts {
476                    let avail = array.len() - offset;
477                    if avail == 0 {
478                        break;
479                    }
480                    let chunk_size = avail.min(part_size);
481                    let part = array.slice(offset, chunk_size);
482                    let task = self.create_encode_task(vec![part])?;
483                    tasks.push(task);
484                    offset += chunk_size;
485                }
486                Ok(tasks)
487            }
488        } else {
489            // Multiple parts that (presumably) all fit in a page
490            //
491            // TODO: Could check here if there are any jumbo parts in the mix that need splitting
492            Ok(vec![self.create_encode_task(arrays)?])
493        }
494    }
495}
496
497impl FieldEncoder for PrimitiveFieldEncoder {
498    // Buffers data, if there is enough to write a page then we create an encode task
499    fn maybe_encode(
500        &mut self,
501        array: ArrayRef,
502        _external_buffers: &mut OutOfLineBuffers,
503        _repdef: RepDefBuilder,
504        row_number: u64,
505        num_rows: u64,
506    ) -> Result<Vec<EncodeTask>> {
507        if let Some(arrays) = self.accumulation_queue.insert(array, row_number, num_rows) {
508            Ok(self.do_flush(arrays.0)?)
509        } else {
510            Ok(vec![])
511        }
512    }
513
514    // If there is any data left in the buffer then create an encode task from it
515    fn flush(&mut self, _external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
516        if let Some(arrays) = self.accumulation_queue.flush() {
517            Ok(self.do_flush(arrays.0)?)
518        } else {
519            Ok(vec![])
520        }
521    }
522
523    fn num_columns(&self) -> u32 {
524        1
525    }
526
527    fn finish(
528        &mut self,
529        _external_buffers: &mut OutOfLineBuffers,
530    ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
531        std::future::ready(Ok(vec![EncodedColumn::default()])).boxed()
532    }
533}