lance_encoding/previous/encodings/logical/
list.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{collections::VecDeque, ops::Range, sync::Arc};
5
6use arrow_array::{
7    cast::AsArray,
8    new_empty_array,
9    types::{Int32Type, Int64Type, UInt64Type},
10    Array, ArrayRef, BooleanArray, Int32Array, Int64Array, LargeListArray, ListArray, UInt64Array,
11};
12use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, Buffer, NullBuffer, OffsetBuffer};
13use arrow_schema::{DataType, Field, Fields};
14use futures::{future::BoxFuture, FutureExt};
15use lance_core::{cache::LanceCache, Error, Result};
16use log::trace;
17use snafu::location;
18use tokio::task::JoinHandle;
19
20use crate::{
21    buffer::LanceBuffer,
22    data::{BlockInfo, DataBlock, FixedWidthDataBlock},
23    decoder::{
24        DecodeArrayTask, DecodeBatchScheduler, FilterExpression, ListPriorityRange, MessageType,
25        NextDecodeTask, PageEncoding, PriorityRange, ScheduledScanLine, SchedulerContext,
26    },
27    encoder::{EncodeTask, EncodedColumn, EncodedPage, FieldEncoder, OutOfLineBuffers},
28    format::pb,
29    previous::{
30        decoder::{FieldScheduler, LogicalPageDecoder, SchedulingJob},
31        encoder::{ArrayEncoder, EncodedArray},
32        encodings::logical::r#struct::{SimpleStructDecoder, SimpleStructScheduler},
33    },
34    repdef::RepDefBuilder,
35    utils::accumulation::AccumulationQueue,
36    EncodingsIo,
37};
38
39// Scheduling lists is tricky.  Imagine the following scenario:
40//
41// * There are 2000 offsets per offsets page
42// * The user requests range 8000..8500
43//
44// First, since 8000 matches the start of an offsets page, we don't need to read an extra offset.
45//
46// Since this range matches the start of a page, we know we will get an offsets array like
47// [0, ...]
48//
49// We need to restore nulls, which relies on a null offset adjustment, which is unique to each offsets
50// page.
51//
52// We need to map this to [X, ...] where X is the sum of the number of items in the 0-2000, 2000-4000,
53// and 4000-6000 pages.
54//
55// This gets even trickier if a range spans multiple offsets pages.  For example, given the same
56// scenario but the user requests 7999..8500.  In this case the first page read will include an
57// extra offset (e.g. we need to read 7998..8000), the null adjustment will be different between the
58// two, and the items offset will be different.
59//
60// To handle this, we take the incoming row requests, look at the page info, and then calculate
61// list requests.
62
63#[derive(Debug)]
64struct ListRequest {
65    /// How many lists this request maps to
66    num_lists: u64,
67    /// Did this request include an extra offset
68    includes_extra_offset: bool,
69    /// The null offset adjustment for this request
70    null_offset_adjustment: u64,
71    /// items offset to apply
72    items_offset: u64,
73}
74
75#[derive(Debug)]
76struct ListRequestsIter {
77    // The bool triggers whether we need to skip an offset or not
78    list_requests: VecDeque<ListRequest>,
79    offsets_requests: Vec<Range<u64>>,
80}
81
82impl ListRequestsIter {
83    // TODO: This logic relies on row_ranges being ordered and may be a problem when we
84    // add proper support for out-of-order take
85    fn new(row_ranges: &[Range<u64>], page_infos: &[OffsetPageInfo]) -> Self {
86        let mut items_offset = 0;
87        let mut offsets_offset = 0;
88        let mut page_infos_iter = page_infos.iter();
89        let mut cur_page_info = page_infos_iter.next().unwrap();
90        let mut list_requests = VecDeque::new();
91        let mut offsets_requests = Vec::new();
92
93        // Each row range maps to at least one list request.  It may map to more if the
94        // range spans multiple offsets pages.
95        for range in row_ranges {
96            let mut range = range.clone();
97
98            // Skip any offsets pages that are before the range
99            while offsets_offset + (cur_page_info.offsets_in_page) <= range.start {
100                trace!("Skipping null offset adjustment chunk {:?}", offsets_offset);
101                offsets_offset += cur_page_info.offsets_in_page;
102                items_offset += cur_page_info.num_items_referenced_by_page;
103                cur_page_info = page_infos_iter.next().unwrap();
104            }
105
106            // If the range starts at the beginning of an offsets page we don't need
107            // to read an extra offset
108            let mut includes_extra_offset = range.start != offsets_offset;
109            if includes_extra_offset {
110                offsets_requests.push(range.start - 1..range.end);
111            } else {
112                offsets_requests.push(range.clone());
113            }
114
115            // At this point our range overlaps the current page (cur_page_info) and
116            // we can start slicing it into list requests
117            while !range.is_empty() {
118                // The end of the list request is the min of the end of the range
119                // and the end of the current page
120                let end = offsets_offset + cur_page_info.offsets_in_page;
121                let last = end >= range.end;
122                let end = end.min(range.end);
123                list_requests.push_back(ListRequest {
124                    num_lists: end - range.start,
125                    includes_extra_offset,
126                    null_offset_adjustment: cur_page_info.null_offset_adjustment,
127                    items_offset,
128                });
129
130                includes_extra_offset = false;
131                range.start = end;
132                // If there is still more data in the range, we need to move to the
133                // next page
134                if !last {
135                    offsets_offset += cur_page_info.offsets_in_page;
136                    items_offset += cur_page_info.num_items_referenced_by_page;
137                    cur_page_info = page_infos_iter.next().unwrap();
138                }
139            }
140        }
141        Self {
142            list_requests,
143            offsets_requests,
144        }
145    }
146
147    // Given a page of offset data, grab the corresponding list requests
148    fn next(&mut self, mut num_offsets: u64) -> Vec<ListRequest> {
149        let mut list_requests = Vec::new();
150        while num_offsets > 0 {
151            let req = self.list_requests.front_mut().unwrap();
152            // If the request did not start at zero then we need to read an extra offset
153            if req.includes_extra_offset {
154                num_offsets -= 1;
155                debug_assert_ne!(num_offsets, 0);
156            }
157            if num_offsets >= req.num_lists {
158                num_offsets -= req.num_lists;
159                list_requests.push(self.list_requests.pop_front().unwrap());
160            } else {
161                let sub_req = ListRequest {
162                    num_lists: num_offsets,
163                    includes_extra_offset: req.includes_extra_offset,
164                    null_offset_adjustment: req.null_offset_adjustment,
165                    items_offset: req.items_offset,
166                };
167
168                list_requests.push(sub_req);
169                req.includes_extra_offset = false;
170                req.num_lists -= num_offsets;
171                num_offsets = 0;
172            }
173        }
174        list_requests
175    }
176}
177
178/// Given a list of offsets and a list of requested list row ranges we need to rewrite the offsets so that
179/// they appear as expected for a list array.  This involves a number of tasks:
180///
181///  * Nulls in the offsets are represented by oversize values and these need to be converted to
182///    the appropriate length
183///  * For each range we (usually) load N + 1 offsets, so if we have 5 ranges we have 5 extra values
184///    and we need to drop 4 of those.
185///  * Ranges may not start at 0 and, while we don't strictly need to, we want to go ahead and normalize
186///    the offsets so that the first offset is 0.
187///
188/// Throughout the comments we will consider the following example case:
189///
190/// The user requests the following ranges of lists (list_row_ranges): [0..3, 5..6]
191///
192/// This is a total of 4 lists.  The loaded offsets are [10, 20, 120, 150, 60].  The last valid offset is 99.
193/// The null_offset_adjustment will be 100.
194///
195/// Our desired output offsets are going to be [0, 10, 20, 20, 30] and the item ranges are [0..20] and [50..60]
196/// The validity array is [true, true, false, true]
197fn decode_offsets(
198    offsets: &dyn Array,
199    list_requests: &[ListRequest],
200    null_offset_adjustment: u64,
201) -> (VecDeque<Range<u64>>, Vec<u64>, BooleanBuffer) {
202    // In our example this is [10, 20, 120, 50, 60]
203    let numeric_offsets = offsets.as_primitive::<UInt64Type>();
204    // In our example there are 4 total lists
205    let total_num_lists = list_requests.iter().map(|req| req.num_lists).sum::<u64>() as u32;
206    let mut normalized_offsets = Vec::with_capacity(total_num_lists as usize);
207    let mut validity_buffer = BooleanBufferBuilder::new(total_num_lists as usize);
208    // The first output offset is always 0 no matter what
209    normalized_offsets.push(0);
210    let mut last_normalized_offset = 0;
211    let offsets_values = numeric_offsets.values();
212
213    let mut item_ranges = VecDeque::new();
214    let mut offsets_offset: u32 = 0;
215    // All ranges should be non-empty
216    debug_assert!(list_requests.iter().all(|r| r.num_lists > 0));
217    for req in list_requests {
218        // The # of lists in this particular range
219        let num_lists = req.num_lists;
220
221        // Because we know the first offset is always 0 we don't store that.  This means we have special
222        // logic if a range starts at 0 (we didn't need to read an extra offset value in that case)
223        // In our example we enter this special case on the first range (0..3) but not the second (5..6)
224        // This means the first range, which has 3 lists, maps to 3 values in our offsets array [10, 20, 120]
225        // However, the second range, which has 1 list, maps to 2 values in our offsets array [150, 60]
226        let (items_range, offsets_to_norm_start, num_offsets_to_norm) =
227            if !req.includes_extra_offset {
228                // In our example items start is 0 and items_end is 20
229                let first_offset_idx = 0_usize;
230                let num_offsets = num_lists as usize;
231                let items_start = 0;
232                let items_end = offsets_values[num_offsets - 1] % null_offset_adjustment;
233                let items_range = items_start..items_end;
234                (items_range, first_offset_idx, num_offsets)
235            } else {
236                // In our example, offsets_offset will be 3, items_start will be 50, and items_end will
237                // be 60
238                let first_offset_idx = offsets_offset as usize;
239                let num_offsets = num_lists as usize + 1;
240                let items_start = offsets_values[first_offset_idx] % null_offset_adjustment;
241                let items_end =
242                    offsets_values[first_offset_idx + num_offsets - 1] % null_offset_adjustment;
243                let items_range = items_start..items_end;
244                (items_range, first_offset_idx, num_offsets)
245            };
246
247        // TODO: Maybe consider writing whether there are nulls or not as part of the
248        // page description.  Then we can skip all validity work.  Not clear if that will
249        // be any benefit though.
250
251        // We calculate validity from all elements but the first (or all elements
252        // if this is the special zero-start case)
253        //
254        // So, in our first pass through, we consider [10, 20, 120] (1 null)
255        // In our second pass through we only consider [60] (0 nulls)
256        // Note that the 150 is null but we only loaded it to know where the 50-60 list started
257        // and it doesn't actually correspond to a list (e.g. list 4 is null but we aren't loading it
258        // here)
259        let validity_start = if !req.includes_extra_offset {
260            0
261        } else {
262            offsets_to_norm_start + 1
263        };
264        for off in offsets_values
265            .slice(validity_start, num_lists as usize)
266            .iter()
267        {
268            validity_buffer.append(*off < null_offset_adjustment);
269        }
270
271        // In our special case we need to account for the offset 0-first_item
272        if !req.includes_extra_offset {
273            let first_item = offsets_values[0] % null_offset_adjustment;
274            normalized_offsets.push(first_item);
275            last_normalized_offset = first_item;
276        }
277
278        // Finally, we go through and shift the offsets.  If we just returned them as is (taking care of
279        // nulls) we would get [0, 10, 20, 20, 60] but our last list only has 10 items, not 40 and so we
280        // need to shift that 60 to a 40.
281        normalized_offsets.extend(
282                offsets_values
283                    .slice(offsets_to_norm_start, num_offsets_to_norm)
284                    .windows(2)
285                    .map(|w| {
286                        let start = w[0] % null_offset_adjustment;
287                        let end = w[1] % null_offset_adjustment;
288                        if end < start {
289                            panic!("End is less than start in window {:?} with null_offset_adjustment={} we get start={} and end={}", w, null_offset_adjustment, start, end);
290                        }
291                        let length = end - start;
292                        last_normalized_offset += length;
293                        last_normalized_offset
294                    }),
295            );
296        trace!(
297            "List offsets range of {} lists maps to item range {:?}",
298            num_lists,
299            items_range
300        );
301        offsets_offset += num_offsets_to_norm as u32;
302        if !items_range.is_empty() {
303            let items_range =
304                items_range.start + req.items_offset..items_range.end + req.items_offset;
305            item_ranges.push_back(items_range);
306        }
307    }
308
309    let validity = validity_buffer.finish();
310    (item_ranges, normalized_offsets, validity)
311}
312
313/// After scheduling the offsets we immediately launch this task as a new tokio task
314/// This task waits for the offsets to arrive, decodes them, and then schedules the I/O
315/// for the items.
316///
317/// This task does not wait for the items data.  That happens on the main decode loop (unless
318/// we have list of list of ... in which case it happens in the outer indirect decode loop)
319#[allow(clippy::too_many_arguments)]
320async fn indirect_schedule_task(
321    mut offsets_decoder: Box<dyn LogicalPageDecoder>,
322    list_requests: Vec<ListRequest>,
323    null_offset_adjustment: u64,
324    items_scheduler: Arc<dyn FieldScheduler>,
325    items_type: DataType,
326    io: Arc<dyn EncodingsIo>,
327    cache: Arc<LanceCache>,
328    priority: Box<dyn PriorityRange>,
329) -> Result<IndirectlyLoaded> {
330    let num_offsets = offsets_decoder.num_rows();
331    // We know the offsets are a primitive array and thus will not need additional
332    // pages.  We can use a dummy receiver to match the decoder API
333    offsets_decoder.wait_for_loaded(num_offsets - 1).await?;
334    let decode_task = offsets_decoder.drain(num_offsets)?;
335    let offsets = decode_task.task.decode()?;
336
337    let (item_ranges, offsets, validity) =
338        decode_offsets(offsets.as_ref(), &list_requests, null_offset_adjustment);
339
340    trace!(
341        "Indirectly scheduling items ranges {:?} from list items column with {} rows (and priority {:?})",
342        item_ranges,
343        items_scheduler.num_rows(),
344        priority
345    );
346    let offsets: Arc<[u64]> = offsets.into();
347
348    // All requested lists are empty
349    if item_ranges.is_empty() {
350        debug_assert!(item_ranges.iter().all(|r| r.start == r.end));
351        return Ok(IndirectlyLoaded {
352            root_decoder: None,
353            offsets,
354            validity,
355        });
356    }
357    let item_ranges = item_ranges.into_iter().collect::<Vec<_>>();
358    let num_items = item_ranges.iter().map(|r| r.end - r.start).sum::<u64>();
359
360    // Create a new root scheduler, which has one column, which is our items data
361    let root_fields = Fields::from(vec![Field::new("item", items_type, true)]);
362    let indirect_root_scheduler =
363        SimpleStructScheduler::new(vec![items_scheduler], root_fields.clone(), num_items);
364    #[allow(deprecated)]
365    let mut indirect_scheduler = DecodeBatchScheduler::from_scheduler(
366        Arc::new(indirect_root_scheduler),
367        root_fields.clone(),
368        cache,
369    );
370    let mut root_decoder = SimpleStructDecoder::new(root_fields, num_items);
371
372    let priority = Box::new(ListPriorityRange::new(priority, offsets.clone()));
373
374    let indirect_messages = indirect_scheduler.schedule_ranges_to_vec(
375        &item_ranges,
376        // Can't push filters into list items
377        &FilterExpression::no_filter(),
378        io,
379        Some(priority),
380    )?;
381
382    for message in indirect_messages {
383        for decoder in message.decoders {
384            let decoder = decoder.into_legacy();
385            if !decoder.path.is_empty() {
386                root_decoder.accept_child(decoder)?;
387            }
388        }
389    }
390
391    Ok(IndirectlyLoaded {
392        offsets,
393        validity,
394        root_decoder: Some(root_decoder),
395    })
396}
397
398#[derive(Debug)]
399struct ListFieldSchedulingJob<'a> {
400    scheduler: &'a ListFieldScheduler,
401    offsets: Box<dyn SchedulingJob + 'a>,
402    num_rows: u64,
403    list_requests_iter: ListRequestsIter,
404}
405
406impl<'a> ListFieldSchedulingJob<'a> {
407    fn try_new(
408        scheduler: &'a ListFieldScheduler,
409        ranges: &[Range<u64>],
410        filter: &FilterExpression,
411    ) -> Result<Self> {
412        let list_requests_iter = ListRequestsIter::new(ranges, &scheduler.offset_page_info);
413        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
414        let offsets = scheduler
415            .offsets_scheduler
416            .schedule_ranges(&list_requests_iter.offsets_requests, filter)?;
417        Ok(Self {
418            scheduler,
419            offsets,
420            list_requests_iter,
421            num_rows,
422        })
423    }
424}
425
426impl SchedulingJob for ListFieldSchedulingJob<'_> {
427    fn schedule_next(
428        &mut self,
429        context: &mut SchedulerContext,
430        priority: &dyn PriorityRange,
431    ) -> Result<ScheduledScanLine> {
432        let next_offsets = self.offsets.schedule_next(context, priority)?;
433        let offsets_scheduled = next_offsets.rows_scheduled;
434        let list_reqs = self.list_requests_iter.next(offsets_scheduled);
435        trace!(
436            "Scheduled {} offsets which maps to list requests: {:?}",
437            offsets_scheduled,
438            list_reqs
439        );
440        let null_offset_adjustment = list_reqs[0].null_offset_adjustment;
441        // It shouldn't be possible for `list_reqs` to span more than one offsets page and so it shouldn't
442        // be possible for the null_offset_adjustment to change
443        debug_assert!(list_reqs
444            .iter()
445            .all(|req| req.null_offset_adjustment == null_offset_adjustment));
446        let num_rows = list_reqs.iter().map(|req| req.num_lists).sum::<u64>();
447        // offsets is a uint64 which is guaranteed to create one decoder on each call to schedule_next
448        let next_offsets_decoder = next_offsets
449            .decoders
450            .into_iter()
451            .next()
452            .unwrap()
453            .into_legacy()
454            .decoder;
455
456        let items_scheduler = self.scheduler.items_scheduler.clone();
457        let items_type = self.scheduler.items_field.data_type().clone();
458        let io = context.io().clone();
459        let cache = context.cache().clone();
460
461        // Immediately spawn the indirect scheduling
462        let indirect_fut = tokio::spawn(indirect_schedule_task(
463            next_offsets_decoder,
464            list_reqs,
465            null_offset_adjustment,
466            items_scheduler,
467            items_type,
468            io,
469            cache,
470            priority.box_clone(),
471        ));
472
473        // Return a decoder
474        let decoder = Box::new(ListPageDecoder {
475            offsets: Arc::new([]),
476            validity: BooleanBuffer::new(Buffer::from_vec(Vec::<u8>::default()), 0, 0),
477            item_decoder: None,
478            rows_drained: 0,
479            rows_loaded: 0,
480            items_field: self.scheduler.items_field.clone(),
481            num_rows,
482            unloaded: Some(indirect_fut),
483            offset_type: self.scheduler.offset_type.clone(),
484            data_type: self.scheduler.list_type.clone(),
485        });
486        #[allow(deprecated)]
487        let decoder = context.locate_decoder(decoder);
488        Ok(ScheduledScanLine {
489            decoders: vec![MessageType::DecoderReady(decoder)],
490            rows_scheduled: num_rows,
491        })
492    }
493
494    fn num_rows(&self) -> u64 {
495        self.num_rows
496    }
497}
498
499/// A page scheduler for list fields that encodes offsets in one field and items in another
500///
501/// The list scheduler is somewhat unique because it requires indirect I/O.  We cannot know the
502/// ranges we need simply by looking at the metadata.  This means that list scheduling doesn't
503/// fit neatly into the two-thread schedule-loop / decode-loop model.  To handle this, when a
504/// list page is scheduled, we only schedule the I/O for the offsets and then we immediately
505/// launch a new tokio task.  This new task waits for the offsets, decodes them, and then
506/// schedules the I/O for the items.  Keep in mind that list items can be lists themselves.  If
507/// that is the case then this indirection will continue.  The decode task that is returned will
508/// only finish `wait`ing when all of the I/O has completed.
509///
510/// Whenever we schedule follow-up I/O like this the priority is based on the top-level row
511/// index.  This helps ensure that earlier rows get finished completely (including follow up
512/// tasks) before we perform I/O for later rows.
513#[derive(Debug)]
514pub struct ListFieldScheduler {
515    offsets_scheduler: Arc<dyn FieldScheduler>,
516    items_scheduler: Arc<dyn FieldScheduler>,
517    items_field: Arc<Field>,
518    offset_type: DataType,
519    list_type: DataType,
520    offset_page_info: Vec<OffsetPageInfo>,
521}
522
523/// The offsets are stored in a uint64 encoded column.  For each page we
524/// store some supplementary data that helps us understand the offsets.
525/// This is needed to construct the scheduler
526#[derive(Debug)]
527pub struct OffsetPageInfo {
528    pub offsets_in_page: u64,
529    pub null_offset_adjustment: u64,
530    pub num_items_referenced_by_page: u64,
531}
532
533impl ListFieldScheduler {
534    // Create a new ListPageScheduler
535    pub fn new(
536        offsets_scheduler: Arc<dyn FieldScheduler>,
537        items_scheduler: Arc<dyn FieldScheduler>,
538        items_field: Arc<Field>,
539        // Should be int32 or int64
540        offset_type: DataType,
541        offset_page_info: Vec<OffsetPageInfo>,
542    ) -> Self {
543        let list_type = match &offset_type {
544            DataType::Int32 => DataType::List(items_field.clone()),
545            DataType::Int64 => DataType::LargeList(items_field.clone()),
546            _ => panic!("Unexpected offset type {}", offset_type),
547        };
548        Self {
549            offsets_scheduler,
550            items_scheduler,
551            items_field,
552            offset_type,
553            offset_page_info,
554            list_type,
555        }
556    }
557}
558
559impl FieldScheduler for ListFieldScheduler {
560    fn schedule_ranges<'a>(
561        &'a self,
562        ranges: &[Range<u64>],
563        filter: &FilterExpression,
564    ) -> Result<Box<dyn SchedulingJob + 'a>> {
565        Ok(Box::new(ListFieldSchedulingJob::try_new(
566            self, ranges, filter,
567        )?))
568    }
569
570    fn num_rows(&self) -> u64 {
571        self.offsets_scheduler.num_rows()
572    }
573
574    fn initialize<'a>(
575        &'a self,
576        _filter: &'a FilterExpression,
577        _context: &'a SchedulerContext,
578    ) -> BoxFuture<'a, Result<()>> {
579        // 2.0 schedulers do not need to initialize
580        std::future::ready(Ok(())).boxed()
581    }
582}
583
584/// As soon as the first call to decode comes in we wait for all indirect I/O to
585/// complete.
586///
587/// Once the indirect I/O is finished we pull items out of `unawaited`, wait them
588/// (this wait should return immediately) and then push them into `item_decoders`.
589///
590/// We then drain from `item_decoders`, popping item pages off as we finish with
591/// them.
592///
593/// TODO: Test the case where a single list page has multiple items pages
594#[derive(Debug)]
595struct ListPageDecoder {
596    unloaded: Option<JoinHandle<Result<IndirectlyLoaded>>>,
597    // offsets and validity will have already been decoded as part of the indirect I/O
598    offsets: Arc<[u64]>,
599    validity: BooleanBuffer,
600    item_decoder: Option<SimpleStructDecoder>,
601    num_rows: u64,
602    rows_drained: u64,
603    rows_loaded: u64,
604    items_field: Arc<Field>,
605    offset_type: DataType,
606    data_type: DataType,
607}
608
609struct ListDecodeTask {
610    offsets: Vec<u64>,
611    validity: BooleanBuffer,
612    // Will be None if there are no items (all empty / null lists)
613    items: Option<Box<dyn DecodeArrayTask>>,
614    items_field: Arc<Field>,
615    offset_type: DataType,
616}
617
618impl DecodeArrayTask for ListDecodeTask {
619    fn decode(self: Box<Self>) -> Result<ArrayRef> {
620        let items = self
621            .items
622            .map(|items| {
623                // When we run the indirect I/O we wrap things in a struct array with a single field
624                // named "item".  We can unwrap that now.
625                let wrapped_items = items.decode()?;
626                Result::Ok(wrapped_items.as_struct().column(0).clone())
627            })
628            .unwrap_or_else(|| Ok(new_empty_array(self.items_field.data_type())))?;
629
630        // The offsets are already decoded but they need to be shifted back to 0 and cast
631        // to the appropriate type
632        //
633        // Although, in some cases, the shift IS strictly required since the unshifted offsets
634        // may cross i32::MAX even though the shifted offsets do not
635        let offsets = UInt64Array::from(self.offsets);
636        let validity = NullBuffer::new(self.validity);
637        let validity = if validity.null_count() == 0 {
638            None
639        } else {
640            Some(validity)
641        };
642        let min_offset = UInt64Array::new_scalar(offsets.value(0));
643        let offsets = arrow_arith::numeric::sub(&offsets, &min_offset)?;
644        match &self.offset_type {
645            DataType::Int32 => {
646                let offsets = arrow_cast::cast(&offsets, &DataType::Int32)?;
647                let offsets_i32 = offsets.as_primitive::<Int32Type>();
648                let offsets = OffsetBuffer::new(offsets_i32.values().clone());
649
650                Ok(Arc::new(ListArray::try_new(
651                    self.items_field.clone(),
652                    offsets,
653                    items,
654                    validity,
655                )?))
656            }
657            DataType::Int64 => {
658                let offsets = arrow_cast::cast(&offsets, &DataType::Int64)?;
659                let offsets_i64 = offsets.as_primitive::<Int64Type>();
660                let offsets = OffsetBuffer::new(offsets_i64.values().clone());
661
662                Ok(Arc::new(LargeListArray::try_new(
663                    self.items_field.clone(),
664                    offsets,
665                    items,
666                    validity,
667                )?))
668            }
669            _ => panic!("ListDecodeTask with data type that is not i32 or i64"),
670        }
671    }
672}
673
674// Helper method that performs binary search.  However, once the
675// target is found it walks past any duplicates.  E.g. if the
676// input list is [0, 3, 5, 5, 5, 7] then this will only return
677// 0, 1, 4, or 5.
678fn binary_search_to_end(to_search: &[u64], target: u64) -> u64 {
679    let mut result = match to_search.binary_search(&target) {
680        Ok(idx) => idx,
681        Err(idx) => idx - 1,
682    };
683    while result < (to_search.len() - 1) && to_search[result + 1] == target {
684        result += 1;
685    }
686    result as u64
687}
688
689impl LogicalPageDecoder for ListPageDecoder {
690    fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<'_, Result<()>> {
691        async move {
692            // wait for the indirect I/O to finish, run the scheduler for the indirect
693            // I/O and then wait for enough items to arrive
694            if self.unloaded.is_some() {
695                trace!("List scheduler needs to wait for indirect I/O to complete");
696                let indirectly_loaded = self.unloaded.take().unwrap().await;
697                if let Err(err) = indirectly_loaded {
698                    match err.try_into_panic() {
699                        Ok(err) => std::panic::resume_unwind(err),
700                        Err(err) => panic!("{:?}", err),
701                    };
702                }
703                let indirectly_loaded = indirectly_loaded.unwrap()?;
704
705                self.offsets = indirectly_loaded.offsets;
706                self.validity = indirectly_loaded.validity;
707                self.item_decoder = indirectly_loaded.root_decoder;
708            }
709            if self.rows_loaded > loaded_need {
710                return Ok(());
711            }
712
713            let boundary = loaded_need as usize;
714            debug_assert!(boundary < self.num_rows as usize);
715            // We need more than X lists which means we need at least X+1 lists which means
716            // we need at least offsets[X+1] items which means we need more than offsets[X+1]-1 items.
717            let items_needed = self.offsets[boundary + 1].saturating_sub(1);
718            trace!(
719                "List decoder is waiting for more than {} rows to be loaded and {}/{} are already loaded.  To satisfy this we need more than {} loaded items",
720                loaded_need,
721                self.rows_loaded,
722                self.num_rows,
723                items_needed,
724            );
725
726            let items_loaded = if let Some(item_decoder) = self.item_decoder.as_mut() {
727                item_decoder.wait_for_loaded(items_needed).await?;
728                item_decoder.rows_loaded()
729            } else {
730                0
731            };
732
733            self.rows_loaded = binary_search_to_end(&self.offsets, items_loaded);
734            trace!("List decoder now has {} loaded rows", self.rows_loaded);
735
736            Ok(())
737        }
738        .boxed()
739    }
740
741    fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
742        // We already have the offsets but need to drain the item pages
743        let mut actual_num_rows = num_rows;
744        let item_start = self.offsets[self.rows_drained as usize];
745        if self.offset_type != DataType::Int64 {
746            // We might not be able to drain `num_rows` because that request might contain more than 2^31 items
747            // so we need to figure out how many rows we can actually drain.
748            while actual_num_rows > 0 {
749                let num_items =
750                    self.offsets[(self.rows_drained + actual_num_rows) as usize] - item_start;
751                if num_items <= i32::MAX as u64 {
752                    break;
753                }
754                // TODO: This could be slow.  Maybe faster to start from zero or do binary search.  Investigate when
755                // actually adding support for smaller than requested batches
756                actual_num_rows -= 1;
757            }
758        }
759        if actual_num_rows < num_rows {
760            // TODO: We should be able to automatically
761            // shrink the read batch size if we detect the batches are going to be huge (maybe
762            // even achieve this with a read_batch_bytes parameter, though some estimation may
763            // still be required)
764            return Err(Error::NotSupported { source: format!("loading a batch of {} lists would require creating an array with over i32::MAX items and we don't yet support returning smaller than requested batches", num_rows).into(), location: location!() });
765        }
766        let offsets = self.offsets
767            [self.rows_drained as usize..(self.rows_drained + actual_num_rows + 1) as usize]
768            .to_vec();
769        let validity = self
770            .validity
771            .slice(self.rows_drained as usize, actual_num_rows as usize);
772        let start = offsets[0];
773        let end = offsets[offsets.len() - 1];
774        let num_items_to_drain = end - start;
775
776        let item_decode = if num_items_to_drain == 0 {
777            None
778        } else {
779            self.item_decoder
780                .as_mut()
781                .map(|item_decoder| Result::Ok(item_decoder.drain(num_items_to_drain)?.task))
782                .transpose()?
783        };
784
785        self.rows_drained += num_rows;
786        Ok(NextDecodeTask {
787            num_rows,
788            task: Box::new(ListDecodeTask {
789                offsets,
790                validity,
791                items_field: self.items_field.clone(),
792                items: item_decode,
793                offset_type: self.offset_type.clone(),
794            }) as Box<dyn DecodeArrayTask>,
795        })
796    }
797
798    fn num_rows(&self) -> u64 {
799        self.num_rows
800    }
801
802    fn rows_loaded(&self) -> u64 {
803        self.rows_loaded
804    }
805
806    fn rows_drained(&self) -> u64 {
807        self.rows_drained
808    }
809
810    fn data_type(&self) -> &DataType {
811        &self.data_type
812    }
813}
814
815struct IndirectlyLoaded {
816    offsets: Arc<[u64]>,
817    validity: BooleanBuffer,
818    root_decoder: Option<SimpleStructDecoder>,
819}
820
821impl std::fmt::Debug for IndirectlyLoaded {
822    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
823        f.debug_struct("IndirectlyLoaded")
824            .field("offsets", &self.offsets)
825            .field("validity", &self.validity)
826            .finish()
827    }
828}
829
830/// An encoder for list offsets that "stitches" offsets and encodes nulls into the offsets
831///
832/// If we need to encode several list arrays into a single page then we need to "stitch" the offsets
833/// For example, imagine we have list arrays [[0, 1], [2]] and [[3, 4, 5]].
834///
835/// We will have offset arrays [0, 2, 3] and [0, 3].  We don't want to encode [0, 2, 3, 0, 3].  What
836/// we want is [0, 2, 3, 6]
837///
838/// This encoder also handles validity by converting a null value into an oversized offset.  For example,
839/// if we have four lists with offsets [0, 20, 20, 20, 30] and the list at index 2 is null (note that
840/// the list at index 1 is empty) then we turn this into offsets [0, 20, 20, 51, 30].  We replace a null
841/// offset with previous_offset + max_offset + 1.  This makes it possible to load a single item from the
842/// list array.
843///
844/// These offsets are always stored on disk as a u64 array.  First, this is because its simply much more
845/// likely than one expects that this is needed, even if our lists are not massive.  This is because we
846/// only write an offsets page when we have enough data.  This means we will probably accumulate a million
847/// offsets or more before we bother to write a page. If our lists have a few thousand items a piece then
848/// we end up passing the u32::MAX boundary.
849///
850/// The second reason is that list offsets are very easily compacted with delta + bit packing and so those
851/// u64 offsets should easily be shrunk down before being put on disk.
852///
853/// This encoder can encode both lists and large lists.  It can decode the resulting column into either type
854/// as well. (TODO: Test and enable large lists)
855///
856/// You can even write as a large list and decode as a regular list (as long as no single list has more than
857/// 2^31 items) or vice versa.  You could even encode a mixed stream of list and large list (but unclear that
858/// would ever be useful)
859#[derive(Debug)]
860struct ListOffsetsEncoder {
861    // An accumulation queue, we insert both offset arrays and validity arrays into this queue
862    accumulation_queue: AccumulationQueue,
863    // The inner encoder of offset values
864    inner_encoder: Arc<dyn ArrayEncoder>,
865    column_index: u32,
866}
867
868impl ListOffsetsEncoder {
869    fn new(
870        cache_bytes: u64,
871        keep_original_array: bool,
872        column_index: u32,
873        inner_encoder: Arc<dyn ArrayEncoder>,
874    ) -> Self {
875        Self {
876            accumulation_queue: AccumulationQueue::new(
877                cache_bytes,
878                column_index,
879                keep_original_array,
880            ),
881            inner_encoder,
882            column_index,
883        }
884    }
885
886    /// Given a list array, return the offsets as a standalone ArrayRef (either an Int32Array or Int64Array)
887    fn extract_offsets(list_arr: &dyn Array) -> ArrayRef {
888        match list_arr.data_type() {
889            DataType::List(_) => {
890                let offsets = list_arr.as_list::<i32>().offsets().clone();
891                Arc::new(Int32Array::new(offsets.into_inner(), None))
892            }
893            DataType::LargeList(_) => {
894                let offsets = list_arr.as_list::<i64>().offsets().clone();
895                Arc::new(Int64Array::new(offsets.into_inner(), None))
896            }
897            _ => panic!(),
898        }
899    }
900
901    /// Converts the validity of a list array into a boolean array.  If there is no validity information
902    /// then this is an empty boolean array.
903    fn extract_validity(list_arr: &dyn Array) -> ArrayRef {
904        if let Some(validity) = list_arr.nulls() {
905            Arc::new(BooleanArray::new(validity.inner().clone(), None))
906        } else {
907            // We convert None validity into an empty array because the accumulation queue can't
908            // handle Option<ArrayRef>
909            new_empty_array(&DataType::Boolean)
910        }
911    }
912
913    fn make_encode_task(&self, arrays: Vec<ArrayRef>) -> EncodeTask {
914        let inner_encoder = self.inner_encoder.clone();
915        let column_idx = self.column_index;
916        // At this point we should have 2*N arrays where the even-indexed arrays are integer offsets
917        // and the odd-indexed arrays are boolean validity bitmaps
918        let offset_arrays = arrays.iter().step_by(2).cloned().collect::<Vec<_>>();
919        let validity_arrays = arrays.into_iter().skip(1).step_by(2).collect::<Vec<_>>();
920
921        tokio::task::spawn(async move {
922            let num_rows =
923                offset_arrays.iter().map(|arr| arr.len()).sum::<usize>() - offset_arrays.len();
924            let num_rows = num_rows as u64;
925            let mut buffer_index = 0;
926            let array = Self::do_encode(
927                offset_arrays,
928                validity_arrays,
929                &mut buffer_index,
930                num_rows,
931                inner_encoder,
932            )?;
933            let (data, description) = array.into_buffers();
934            Ok(EncodedPage {
935                data,
936                description: PageEncoding::Legacy(description),
937                num_rows,
938                column_idx,
939                row_number: 0, // Legacy encoders do not use
940            })
941        })
942        .map(|res_res| res_res.unwrap())
943        .boxed()
944    }
945
946    fn maybe_encode_offsets_and_validity(&mut self, list_arr: &dyn Array) -> Option<EncodeTask> {
947        let offsets = Self::extract_offsets(list_arr);
948        let validity = Self::extract_validity(list_arr);
949        let num_rows = offsets.len() as u64;
950        // Either inserting the offsets OR inserting the validity could cause the
951        // accumulation queue to fill up
952        if let Some(mut arrays) = self
953            .accumulation_queue
954            .insert(offsets, /*row_number=*/ 0, num_rows)
955        {
956            arrays.0.push(validity);
957            Some(self.make_encode_task(arrays.0))
958        } else if let Some(arrays) = self
959            .accumulation_queue
960            .insert(validity, /*row_number=*/ 0, num_rows)
961        {
962            Some(self.make_encode_task(arrays.0))
963        } else {
964            None
965        }
966    }
967
968    fn flush(&mut self) -> Option<EncodeTask> {
969        if let Some(arrays) = self.accumulation_queue.flush() {
970            Some(self.make_encode_task(arrays.0))
971        } else {
972            None
973        }
974    }
975
976    // Get's the total number of items covered by an array of offsets (keeping in
977    // mind that the first offset may not be zero)
978    fn get_offset_span(array: &dyn Array) -> u64 {
979        match array.data_type() {
980            DataType::Int32 => {
981                let arr_i32 = array.as_primitive::<Int32Type>();
982                (arr_i32.value(arr_i32.len() - 1) - arr_i32.value(0)) as u64
983            }
984            DataType::Int64 => {
985                let arr_i64 = array.as_primitive::<Int64Type>();
986                (arr_i64.value(arr_i64.len() - 1) - arr_i64.value(0)) as u64
987            }
988            _ => panic!(),
989        }
990    }
991
992    // This is where we do the work to actually shift the offsets and encode nulls
993    // Note that the output is u64 and the input could be i32 OR i64.
994    fn extend_offsets_vec_u64(
995        dest: &mut Vec<u64>,
996        offsets: &dyn Array,
997        validity: Option<&BooleanArray>,
998        // The offset of this list into the destination
999        base: u64,
1000        null_offset_adjustment: u64,
1001    ) {
1002        match offsets.data_type() {
1003            DataType::Int32 => {
1004                let offsets_i32 = offsets.as_primitive::<Int32Type>();
1005                let start = offsets_i32.value(0) as u64;
1006                // If we want to take a list from start..X and change it into
1007                // a list from end..X then we need to add (base - start) to all elements
1008                // Note that `modifier` may be negative but (item + modifier) will always be >= 0
1009                let modifier = base as i64 - start as i64;
1010                if let Some(validity) = validity {
1011                    dest.extend(
1012                        offsets_i32
1013                            .values()
1014                            .iter()
1015                            .skip(1)
1016                            .zip(validity.values().iter())
1017                            .map(|(&off, valid)| {
1018                                (off as i64 + modifier) as u64
1019                                    + (!valid as u64 * null_offset_adjustment)
1020                            }),
1021                    );
1022                } else {
1023                    dest.extend(
1024                        offsets_i32
1025                            .values()
1026                            .iter()
1027                            .skip(1)
1028                            // Subtract by `start` so offsets start at 0
1029                            .map(|&v| (v as i64 + modifier) as u64),
1030                    );
1031                }
1032            }
1033            DataType::Int64 => {
1034                let offsets_i64 = offsets.as_primitive::<Int64Type>();
1035                let start = offsets_i64.value(0) as u64;
1036                // If we want to take a list from start..X and change it into
1037                // a list from end..X then we need to add (base - start) to all elements
1038                // Note that `modifier` may be negative but (item + modifier) will always be >= 0
1039                let modifier = base as i64 - start as i64;
1040                if let Some(validity) = validity {
1041                    dest.extend(
1042                        offsets_i64
1043                            .values()
1044                            .iter()
1045                            .skip(1)
1046                            .zip(validity.values().iter())
1047                            .map(|(&off, valid)| {
1048                                (off + modifier) as u64 + (!valid as u64 * null_offset_adjustment)
1049                            }),
1050                    )
1051                } else {
1052                    dest.extend(
1053                        offsets_i64
1054                            .values()
1055                            .iter()
1056                            .skip(1)
1057                            .map(|&v| (v + modifier) as u64),
1058                    );
1059                }
1060            }
1061            _ => panic!("Invalid list offsets data type {:?}", offsets.data_type()),
1062        }
1063    }
1064
1065    fn do_encode_u64(
1066        offset_arrays: Vec<ArrayRef>,
1067        validity: Vec<Option<&BooleanArray>>,
1068        num_offsets: u64,
1069        null_offset_adjustment: u64,
1070        buffer_index: &mut u32,
1071        inner_encoder: Arc<dyn ArrayEncoder>,
1072    ) -> Result<EncodedArray> {
1073        let mut offsets = Vec::with_capacity(num_offsets as usize);
1074        for (offsets_arr, validity_arr) in offset_arrays.iter().zip(validity) {
1075            let last_prev_offset = offsets.last().copied().unwrap_or(0) % null_offset_adjustment;
1076            Self::extend_offsets_vec_u64(
1077                &mut offsets,
1078                &offsets_arr,
1079                validity_arr,
1080                last_prev_offset,
1081                null_offset_adjustment,
1082            );
1083        }
1084        let offsets_data = DataBlock::FixedWidth(FixedWidthDataBlock {
1085            bits_per_value: 64,
1086            data: LanceBuffer::reinterpret_vec(offsets),
1087            num_values: num_offsets,
1088            block_info: BlockInfo::new(),
1089        });
1090        inner_encoder.encode(offsets_data, &DataType::UInt64, buffer_index)
1091    }
1092
1093    fn do_encode(
1094        offset_arrays: Vec<ArrayRef>,
1095        validity_arrays: Vec<ArrayRef>,
1096        buffer_index: &mut u32,
1097        num_offsets: u64,
1098        inner_encoder: Arc<dyn ArrayEncoder>,
1099    ) -> Result<EncodedArray> {
1100        let validity_arrays = validity_arrays
1101            .iter()
1102            .map(|v| {
1103                if v.is_empty() {
1104                    None
1105                } else {
1106                    Some(v.as_boolean())
1107                }
1108            })
1109            .collect::<Vec<_>>();
1110        debug_assert_eq!(offset_arrays.len(), validity_arrays.len());
1111        let total_span = offset_arrays
1112            .iter()
1113            .map(|arr| Self::get_offset_span(arr.as_ref()))
1114            .sum::<u64>();
1115        // See encodings.proto for reasoning behind this value
1116        let null_offset_adjustment = total_span + 1;
1117        let encoded_offsets = Self::do_encode_u64(
1118            offset_arrays,
1119            validity_arrays,
1120            num_offsets,
1121            null_offset_adjustment,
1122            buffer_index,
1123            inner_encoder,
1124        )?;
1125        Ok(EncodedArray {
1126            data: encoded_offsets.data,
1127            encoding: pb::ArrayEncoding {
1128                array_encoding: Some(pb::array_encoding::ArrayEncoding::List(Box::new(
1129                    pb::List {
1130                        offsets: Some(Box::new(encoded_offsets.encoding)),
1131                        null_offset_adjustment,
1132                        num_items: total_span,
1133                    },
1134                ))),
1135            },
1136        })
1137    }
1138}
1139
1140pub struct ListFieldEncoder {
1141    offsets_encoder: ListOffsetsEncoder,
1142    items_encoder: Box<dyn FieldEncoder>,
1143}
1144
1145impl ListFieldEncoder {
1146    pub fn new(
1147        items_encoder: Box<dyn FieldEncoder>,
1148        inner_offsets_encoder: Arc<dyn ArrayEncoder>,
1149        cache_bytes_per_columns: u64,
1150        keep_original_array: bool,
1151        column_index: u32,
1152    ) -> Self {
1153        Self {
1154            offsets_encoder: ListOffsetsEncoder::new(
1155                cache_bytes_per_columns,
1156                keep_original_array,
1157                column_index,
1158                inner_offsets_encoder,
1159            ),
1160            items_encoder,
1161        }
1162    }
1163
1164    fn combine_tasks(
1165        offsets_tasks: Vec<EncodeTask>,
1166        item_tasks: Vec<EncodeTask>,
1167    ) -> Result<Vec<EncodeTask>> {
1168        let mut all_tasks = offsets_tasks;
1169        let item_tasks = item_tasks;
1170        all_tasks.extend(item_tasks);
1171        Ok(all_tasks)
1172    }
1173}
1174
1175impl FieldEncoder for ListFieldEncoder {
1176    fn maybe_encode(
1177        &mut self,
1178        array: ArrayRef,
1179        external_buffers: &mut OutOfLineBuffers,
1180        repdef: RepDefBuilder,
1181        row_number: u64,
1182        num_rows: u64,
1183    ) -> Result<Vec<EncodeTask>> {
1184        // The list may have an offset / shorter length which means the underlying
1185        // values array could be longer than what we need to encode and so we need
1186        // to slice down to the region of interest.
1187        let items = match array.data_type() {
1188            DataType::List(_) => {
1189                let list_arr = array.as_list::<i32>();
1190                let items_start = list_arr.value_offsets()[list_arr.offset()] as usize;
1191                let items_end =
1192                    list_arr.value_offsets()[list_arr.offset() + list_arr.len()] as usize;
1193                list_arr
1194                    .values()
1195                    .slice(items_start, items_end - items_start)
1196            }
1197            DataType::LargeList(_) => {
1198                let list_arr = array.as_list::<i64>();
1199                let items_start = list_arr.value_offsets()[list_arr.offset()] as usize;
1200                let items_end =
1201                    list_arr.value_offsets()[list_arr.offset() + list_arr.len()] as usize;
1202                list_arr
1203                    .values()
1204                    .slice(items_start, items_end - items_start)
1205            }
1206            _ => panic!(),
1207        };
1208        let offsets_tasks = self
1209            .offsets_encoder
1210            .maybe_encode_offsets_and_validity(array.as_ref())
1211            .map(|task| vec![task])
1212            .unwrap_or_default();
1213        let mut item_tasks = self.items_encoder.maybe_encode(
1214            items,
1215            external_buffers,
1216            repdef,
1217            row_number,
1218            num_rows,
1219        )?;
1220        if !offsets_tasks.is_empty() && item_tasks.is_empty() {
1221            // An items page cannot currently be shared by two different offsets pages.  This is
1222            // a limitation in the current scheduler and could be addressed in the future.  As a result
1223            // we always need to encode the items page if we encode the offsets page.
1224            //
1225            // In practice this isn't usually too bad unless we are targeting very small pages.
1226            item_tasks = self.items_encoder.flush(external_buffers)?;
1227        }
1228        Self::combine_tasks(offsets_tasks, item_tasks)
1229    }
1230
1231    fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
1232        let offsets_tasks = self
1233            .offsets_encoder
1234            .flush()
1235            .map(|task| vec![task])
1236            .unwrap_or_default();
1237        let item_tasks = self.items_encoder.flush(external_buffers)?;
1238        Self::combine_tasks(offsets_tasks, item_tasks)
1239    }
1240
1241    fn num_columns(&self) -> u32 {
1242        self.items_encoder.num_columns() + 1
1243    }
1244
1245    fn finish(
1246        &mut self,
1247        external_buffers: &mut OutOfLineBuffers,
1248    ) -> BoxFuture<'_, Result<Vec<EncodedColumn>>> {
1249        let inner_columns = self.items_encoder.finish(external_buffers);
1250        async move {
1251            let mut columns = vec![EncodedColumn::default()];
1252            let inner_columns = inner_columns.await?;
1253            columns.extend(inner_columns);
1254            Ok(columns)
1255        }
1256        .boxed()
1257    }
1258}