Skip to main content

lance_encoding/previous/encodings/logical/
primitive.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{fmt::Debug, ops::Range, sync::Arc, vec};
5
6use arrow_array::{Array, ArrayRef, cast::AsArray, make_array};
7use arrow_buffer::bit_util;
8use arrow_schema::DataType;
9use futures::{FutureExt, future::BoxFuture};
10use log::trace;
11
12use crate::decoder::{ColumnBuffers, PageBuffers};
13use crate::previous::decoder::{FieldScheduler, LogicalPageDecoder, SchedulingJob};
14use crate::previous::encoder::ArrayEncodingStrategy;
15use crate::utils::accumulation::AccumulationQueue;
16use crate::{data::DataBlock, previous::encodings::physical::decoder_from_array_encoding};
17use lance_core::{Error, Result, datatypes::Field};
18
19use crate::{
20    decoder::{
21        DecodeArrayTask, FilterExpression, MessageType, NextDecodeTask, PageEncoding, PageInfo,
22        PageScheduler, PrimitivePageDecoder, PriorityRange, ScheduledScanLine, SchedulerContext,
23    },
24    encoder::{
25        EncodeTask, EncodedColumn, EncodedPage, EncodingOptions, FieldEncoder, OutOfLineBuffers,
26    },
27    repdef::RepDefBuilder,
28};
29
30#[derive(Debug)]
31struct PrimitivePage {
32    scheduler: Box<dyn PageScheduler>,
33    num_rows: u64,
34    page_index: u32,
35}
36
37/// A field scheduler for primitive fields
38///
39/// This maps to exactly one column and it assumes that the top-level
40/// encoding of each page is "basic".  The basic encoding decodes into an
41/// optional buffer of validity and a fixed-width buffer of values
42/// which is exactly what we need to create a primitive array.
43///
44/// Note: we consider booleans and fixed-size-lists of primitive types to be
45/// primitive types.  This is slightly different than arrow-rs's definition
46#[derive(Debug)]
47pub struct PrimitiveFieldScheduler {
48    data_type: DataType,
49    page_schedulers: Vec<PrimitivePage>,
50    num_rows: u64,
51    should_validate: bool,
52    column_index: u32,
53}
54
55impl PrimitiveFieldScheduler {
56    pub fn new(
57        column_index: u32,
58        data_type: DataType,
59        pages: Arc<[PageInfo]>,
60        buffers: ColumnBuffers,
61        should_validate: bool,
62    ) -> Self {
63        let page_schedulers = pages
64            .iter()
65            .enumerate()
66            // Buggy versions of Lance could sometimes create empty pages
67            .filter(|(page_index, page)| {
68                log::trace!("Skipping empty page with index {}", page_index);
69                page.num_rows > 0
70            })
71            .map(|(page_index, page)| {
72                let page_buffers = PageBuffers {
73                    column_buffers: buffers,
74                    positions_and_sizes: &page.buffer_offsets_and_sizes,
75                };
76                let scheduler = decoder_from_array_encoding(
77                    page.encoding.as_legacy(),
78                    &page_buffers,
79                    &data_type,
80                );
81                PrimitivePage {
82                    scheduler,
83                    num_rows: page.num_rows,
84                    page_index: page_index as u32,
85                }
86            })
87            .collect::<Vec<_>>();
88        let num_rows = page_schedulers.iter().map(|p| p.num_rows).sum();
89        Self {
90            data_type,
91            page_schedulers,
92            num_rows,
93            should_validate,
94            column_index,
95        }
96    }
97}
98
99#[derive(Debug)]
100struct PrimitiveFieldSchedulingJob<'a> {
101    scheduler: &'a PrimitiveFieldScheduler,
102    ranges: Vec<Range<u64>>,
103    page_idx: usize,
104    range_idx: usize,
105    range_offset: u64,
106    global_row_offset: u64,
107}
108
109impl<'a> PrimitiveFieldSchedulingJob<'a> {
110    pub fn new(scheduler: &'a PrimitiveFieldScheduler, ranges: Vec<Range<u64>>) -> Self {
111        Self {
112            scheduler,
113            ranges,
114            page_idx: 0,
115            range_idx: 0,
116            range_offset: 0,
117            global_row_offset: 0,
118        }
119    }
120}
121
122impl SchedulingJob for PrimitiveFieldSchedulingJob<'_> {
123    fn schedule_next(
124        &mut self,
125        context: &mut SchedulerContext,
126        priority: &dyn PriorityRange,
127    ) -> Result<ScheduledScanLine> {
128        debug_assert!(self.range_idx < self.ranges.len());
129        // Get our current range
130        let mut range = self.ranges[self.range_idx].clone();
131        range.start += self.range_offset;
132
133        let mut cur_page = &self.scheduler.page_schedulers[self.page_idx];
134        trace!(
135            "Current range is {:?} and current page has {} rows",
136            range, cur_page.num_rows
137        );
138        // Skip entire pages until we have some overlap with our next range
139        while cur_page.num_rows + self.global_row_offset <= range.start {
140            self.global_row_offset += cur_page.num_rows;
141            self.page_idx += 1;
142            trace!("Skipping entire page of {} rows", cur_page.num_rows);
143            cur_page = &self.scheduler.page_schedulers[self.page_idx];
144        }
145
146        // Now the cur_page has overlap with range.  Continue looping through ranges
147        // until we find a range that exceeds the current page
148
149        let mut ranges_in_page = Vec::new();
150        while cur_page.num_rows + self.global_row_offset > range.start {
151            range.start = range.start.max(self.global_row_offset);
152            let start_in_page = range.start - self.global_row_offset;
153            let end_in_page = start_in_page + (range.end - range.start);
154            let end_in_page = end_in_page.min(cur_page.num_rows);
155            let last_in_range = (end_in_page + self.global_row_offset) >= range.end;
156
157            ranges_in_page.push(start_in_page..end_in_page);
158            if last_in_range {
159                self.range_idx += 1;
160                if self.range_idx == self.ranges.len() {
161                    break;
162                }
163                range = self.ranges[self.range_idx].clone();
164            } else {
165                break;
166            }
167        }
168
169        let num_rows_in_next = ranges_in_page.iter().map(|r| r.end - r.start).sum();
170        trace!(
171            "Scheduling {} rows across {} ranges from page with {} rows (priority={}, column_index={}, page_index={})",
172            num_rows_in_next,
173            ranges_in_page.len(),
174            cur_page.num_rows,
175            priority.current_priority(),
176            self.scheduler.column_index,
177            cur_page.page_index,
178        );
179
180        self.global_row_offset += cur_page.num_rows;
181        self.page_idx += 1;
182
183        let physical_decoder = cur_page.scheduler.schedule_ranges(
184            &ranges_in_page,
185            context.io(),
186            priority.current_priority(),
187        );
188
189        let logical_decoder = PrimitiveFieldDecoder {
190            data_type: self.scheduler.data_type.clone(),
191            column_index: self.scheduler.column_index,
192            unloaded_physical_decoder: Some(physical_decoder),
193            physical_decoder: None,
194            rows_drained: 0,
195            num_rows: num_rows_in_next,
196            should_validate: self.scheduler.should_validate,
197            page_index: cur_page.page_index,
198        };
199
200        let decoder = Box::new(logical_decoder);
201        #[allow(deprecated)]
202        let decoder_ready = context.locate_decoder(decoder);
203        Ok(ScheduledScanLine {
204            decoders: vec![MessageType::DecoderReady(decoder_ready)],
205            rows_scheduled: num_rows_in_next,
206        })
207    }
208
209    fn num_rows(&self) -> u64 {
210        self.ranges.iter().map(|r| r.end - r.start).sum()
211    }
212}
213
214impl FieldScheduler for PrimitiveFieldScheduler {
215    fn num_rows(&self) -> u64 {
216        self.num_rows
217    }
218
219    fn schedule_ranges<'a>(
220        &'a self,
221        ranges: &[std::ops::Range<u64>],
222        // TODO: Could potentially use filter to simplify decode, something of a micro-optimization probably
223        _filter: &FilterExpression,
224    ) -> Result<Box<dyn SchedulingJob + 'a>> {
225        Ok(Box::new(PrimitiveFieldSchedulingJob::new(
226            self,
227            ranges.to_vec(),
228        )))
229    }
230
231    fn initialize<'a>(
232        &'a self,
233        _filter: &'a FilterExpression,
234        _context: &'a SchedulerContext,
235    ) -> BoxFuture<'a, Result<()>> {
236        // 2.0 schedulers do not need to initialize
237        std::future::ready(Ok(())).boxed()
238    }
239}
240
241pub struct PrimitiveFieldDecoder {
242    data_type: DataType,
243    unloaded_physical_decoder: Option<BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>>,
244    physical_decoder: Option<Arc<dyn PrimitivePageDecoder>>,
245    should_validate: bool,
246    num_rows: u64,
247    rows_drained: u64,
248    column_index: u32,
249    page_index: u32,
250}
251
252impl PrimitiveFieldDecoder {
253    pub fn new_from_data(
254        physical_decoder: Arc<dyn PrimitivePageDecoder>,
255        data_type: DataType,
256        num_rows: u64,
257        should_validate: bool,
258    ) -> Self {
259        Self {
260            data_type,
261            unloaded_physical_decoder: None,
262            physical_decoder: Some(physical_decoder),
263            should_validate,
264            num_rows,
265            rows_drained: 0,
266            column_index: u32::MAX,
267            page_index: u32::MAX,
268        }
269    }
270}
271
272impl Debug for PrimitiveFieldDecoder {
273    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
274        f.debug_struct("PrimitiveFieldDecoder")
275            .field("data_type", &self.data_type)
276            .field("num_rows", &self.num_rows)
277            .field("rows_drained", &self.rows_drained)
278            .finish()
279    }
280}
281
282struct PrimitiveFieldDecodeTask {
283    rows_to_skip: u64,
284    rows_to_take: u64,
285    should_validate: bool,
286    physical_decoder: Arc<dyn PrimitivePageDecoder>,
287    data_type: DataType,
288}
289
290impl DecodeArrayTask for PrimitiveFieldDecodeTask {
291    fn decode(self: Box<Self>) -> Result<ArrayRef> {
292        let block = self
293            .physical_decoder
294            .decode(self.rows_to_skip, self.rows_to_take)?;
295
296        let array = make_array(block.into_arrow(self.data_type.clone(), self.should_validate)?);
297
298        // This is a bit of a hack to work around https://github.com/apache/arrow-rs/issues/6302
299        //
300        // We change from nulls-in-dictionary (storage format) to nulls-in-indices (arrow-rs preferred
301        // format)
302        //
303        // The calculation of logical_nulls is not free and would be good to avoid in the future
304        if let DataType::Dictionary(_, _) = self.data_type {
305            let dict = array.as_any_dictionary();
306            if let Some(nulls) = array.logical_nulls() {
307                let new_indices = dict.keys().to_data();
308                let new_array = make_array(
309                    new_indices
310                        .into_builder()
311                        .nulls(Some(nulls))
312                        .add_child_data(dict.values().to_data())
313                        .data_type(dict.data_type().clone())
314                        .build()?,
315                );
316                return Ok(new_array);
317            }
318        }
319        Ok(array)
320    }
321}
322
323impl LogicalPageDecoder for PrimitiveFieldDecoder {
324    // TODO: In the future, at some point, we may consider partially waiting for primitive pages by
325    // breaking up large I/O into smaller I/O as a way to accelerate the "time-to-first-decode"
326    fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<'_, Result<()>> {
327        log::trace!(
328            "primitive wait for more than {} rows on column {} and page {} (page has {} rows)",
329            loaded_need,
330            self.column_index,
331            self.page_index,
332            self.num_rows
333        );
334        async move {
335            let physical_decoder = self.unloaded_physical_decoder.take().unwrap().await?;
336            self.physical_decoder = Some(Arc::from(physical_decoder));
337            Ok(())
338        }
339        .boxed()
340    }
341
342    fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
343        if self.physical_decoder.as_ref().is_none() {
344            return Err(lance_core::Error::internal(format!(
345                "drain was called on primitive field decoder for data type {} on column {} but the decoder was never awaited",
346                self.data_type, self.column_index
347            )));
348        }
349
350        let rows_to_skip = self.rows_drained;
351        let rows_to_take = num_rows;
352
353        self.rows_drained += rows_to_take;
354
355        let task = Box::new(PrimitiveFieldDecodeTask {
356            rows_to_skip,
357            rows_to_take,
358            should_validate: self.should_validate,
359            physical_decoder: self.physical_decoder.as_ref().unwrap().clone(),
360            data_type: self.data_type.clone(),
361        });
362
363        Ok(NextDecodeTask {
364            task,
365            num_rows: rows_to_take,
366        })
367    }
368
369    fn rows_loaded(&self) -> u64 {
370        if self.unloaded_physical_decoder.is_some() {
371            0
372        } else {
373            self.num_rows
374        }
375    }
376
377    fn rows_drained(&self) -> u64 {
378        if self.unloaded_physical_decoder.is_some() {
379            0
380        } else {
381            self.rows_drained
382        }
383    }
384
385    fn num_rows(&self) -> u64 {
386        self.num_rows
387    }
388
389    fn data_type(&self) -> &DataType {
390        &self.data_type
391    }
392}
393
394pub struct PrimitiveFieldEncoder {
395    accumulation_queue: AccumulationQueue,
396    array_encoding_strategy: Arc<dyn ArrayEncodingStrategy>,
397    column_index: u32,
398    field: Field,
399    max_page_bytes: u64,
400}
401
402impl PrimitiveFieldEncoder {
403    pub fn try_new(
404        options: &EncodingOptions,
405        array_encoding_strategy: Arc<dyn ArrayEncodingStrategy>,
406        column_index: u32,
407        field: Field,
408    ) -> Result<Self> {
409        Ok(Self {
410            accumulation_queue: AccumulationQueue::new(
411                options.cache_bytes_per_column,
412                column_index,
413                options.keep_original_array,
414            ),
415            column_index,
416            max_page_bytes: options.max_page_bytes,
417            array_encoding_strategy,
418            field,
419        })
420    }
421
422    fn create_encode_task(&mut self, arrays: Vec<ArrayRef>) -> Result<EncodeTask> {
423        let encoder = self
424            .array_encoding_strategy
425            .create_array_encoder(&arrays, &self.field)?;
426        let column_idx = self.column_index;
427        let data_type = self.field.data_type();
428
429        Ok(tokio::task::spawn(async move {
430            let num_values = arrays.iter().map(|arr| arr.len() as u64).sum();
431            let data = DataBlock::from_arrays(&arrays, num_values);
432            let mut buffer_index = 0;
433            let array = encoder.encode(data, &data_type, &mut buffer_index)?;
434            let (data, description) = array.into_buffers();
435            Ok(EncodedPage {
436                data,
437                description: PageEncoding::Legacy(description),
438                num_rows: num_values,
439                column_idx,
440                row_number: 0, // legacy encoders do not use
441            })
442        })
443        .map(|res_res| {
444            res_res.unwrap_or_else(|err| {
445                Err(Error::internal(format!(
446                    "Encoding task failed with error: {:?}",
447                    err
448                )))
449            })
450        })
451        .boxed())
452    }
453
454    // Creates an encode task, consuming all buffered data
455    fn do_flush(&mut self, arrays: Vec<ArrayRef>) -> Result<Vec<EncodeTask>> {
456        if arrays.len() == 1 {
457            let array = arrays.into_iter().next().unwrap();
458            let size_bytes = array.get_buffer_memory_size();
459            let num_parts = bit_util::ceil(size_bytes, self.max_page_bytes as usize);
460            // Can't slice it finer than 1 page per row
461            let num_parts = num_parts.min(array.len());
462            if num_parts <= 1 {
463                // One part and it fits in a page
464                Ok(vec![self.create_encode_task(vec![array])?])
465            } else {
466                // One part and it needs to be sliced into multiple pages
467
468                // This isn't perfect (items in the array might not all have the same size)
469                // but it's a reasonable stab for now)
470                let mut tasks = Vec::with_capacity(num_parts);
471                let mut offset = 0;
472                let part_size = bit_util::ceil(array.len(), num_parts);
473                for _ in 0..num_parts {
474                    let avail = array.len() - offset;
475                    if avail == 0 {
476                        break;
477                    }
478                    let chunk_size = avail.min(part_size);
479                    let part = array.slice(offset, chunk_size);
480                    let task = self.create_encode_task(vec![part])?;
481                    tasks.push(task);
482                    offset += chunk_size;
483                }
484                Ok(tasks)
485            }
486        } else {
487            // Multiple parts that (presumably) all fit in a page
488            //
489            // TODO: Could check here if there are any jumbo parts in the mix that need splitting
490            Ok(vec![self.create_encode_task(arrays)?])
491        }
492    }
493}
494
495impl FieldEncoder for PrimitiveFieldEncoder {
496    // Buffers data, if there is enough to write a page then we create an encode task
497    fn maybe_encode(
498        &mut self,
499        array: ArrayRef,
500        _external_buffers: &mut OutOfLineBuffers,
501        _repdef: RepDefBuilder,
502        row_number: u64,
503        num_rows: u64,
504    ) -> Result<Vec<EncodeTask>> {
505        if let Some(arrays) = self.accumulation_queue.insert(array, row_number, num_rows) {
506            Ok(self.do_flush(arrays.0)?)
507        } else {
508            Ok(vec![])
509        }
510    }
511
512    // If there is any data left in the buffer then create an encode task from it
513    fn flush(&mut self, _external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
514        if let Some(arrays) = self.accumulation_queue.flush() {
515            Ok(self.do_flush(arrays.0)?)
516        } else {
517            Ok(vec![])
518        }
519    }
520
521    fn num_columns(&self) -> u32 {
522        1
523    }
524
525    fn finish(
526        &mut self,
527        _external_buffers: &mut OutOfLineBuffers,
528    ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
529        std::future::ready(Ok(vec![EncodedColumn::default()])).boxed()
530    }
531}