Skip to main content

lance_encoding/previous/encodings/logical/
list.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{collections::VecDeque, ops::Range, sync::Arc};
5
6use arrow_array::{
7    Array, ArrayRef, BooleanArray, Int32Array, Int64Array, LargeListArray, ListArray, UInt64Array,
8    cast::AsArray,
9    new_empty_array,
10    types::{Int32Type, Int64Type, UInt64Type},
11};
12use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, Buffer, NullBuffer, OffsetBuffer};
13use arrow_schema::{DataType, Field, Fields};
14use futures::{FutureExt, future::BoxFuture};
15use lance_core::{Error, Result, cache::LanceCache};
16use log::trace;
17use tokio::task::JoinHandle;
18
19use crate::{
20    EncodingsIo,
21    buffer::LanceBuffer,
22    data::{BlockInfo, DataBlock, FixedWidthDataBlock},
23    decoder::{
24        DecodeArrayTask, DecodeBatchScheduler, FilterExpression, ListPriorityRange, MessageType,
25        NextDecodeTask, PageEncoding, PriorityRange, ScheduledScanLine, SchedulerContext,
26    },
27    encoder::{EncodeTask, EncodedColumn, EncodedPage, FieldEncoder, OutOfLineBuffers},
28    format::pb,
29    previous::{
30        decoder::{FieldScheduler, LogicalPageDecoder, SchedulingJob},
31        encoder::{ArrayEncoder, EncodedArray},
32        encodings::logical::r#struct::{SimpleStructDecoder, SimpleStructScheduler},
33    },
34    repdef::RepDefBuilder,
35    utils::accumulation::AccumulationQueue,
36};
37
38// Scheduling lists is tricky.  Imagine the following scenario:
39//
40// * There are 2000 offsets per offsets page
41// * The user requests range 8000..8500
42//
43// First, since 8000 matches the start of an offsets page, we don't need to read an extra offset.
44//
45// Since this range matches the start of a page, we know we will get an offsets array like
46// [0, ...]
47//
48// We need to restore nulls, which relies on a null offset adjustment, which is unique to each offsets
49// page.
50//
51// We need to map this to [X, ...] where X is the sum of the number of items in the 0-2000, 2000-4000,
52// and 4000-6000 pages.
53//
54// This gets even trickier if a range spans multiple offsets pages.  For example, given the same
55// scenario but the user requests 7999..8500.  In this case the first page read will include an
56// extra offset (e.g. we need to read 7998..8000), the null adjustment will be different between the
57// two, and the items offset will be different.
58//
59// To handle this, we take the incoming row requests, look at the page info, and then calculate
60// list requests.
61
62#[derive(Debug)]
63struct ListRequest {
64    /// How many lists this request maps to
65    num_lists: u64,
66    /// Did this request include an extra offset
67    includes_extra_offset: bool,
68    /// The null offset adjustment for this request
69    null_offset_adjustment: u64,
70    /// items offset to apply
71    items_offset: u64,
72}
73
74#[derive(Debug)]
75struct ListRequestsIter {
76    // The bool triggers whether we need to skip an offset or not
77    list_requests: VecDeque<ListRequest>,
78    offsets_requests: Vec<Range<u64>>,
79}
80
81impl ListRequestsIter {
82    // TODO: This logic relies on row_ranges being ordered and may be a problem when we
83    // add proper support for out-of-order take
84    fn new(row_ranges: &[Range<u64>], page_infos: &[OffsetPageInfo]) -> Self {
85        let mut items_offset = 0;
86        let mut offsets_offset = 0;
87        let mut page_infos_iter = page_infos.iter();
88        let mut cur_page_info = page_infos_iter.next().unwrap();
89        let mut list_requests = VecDeque::new();
90        let mut offsets_requests = Vec::new();
91
92        // Each row range maps to at least one list request.  It may map to more if the
93        // range spans multiple offsets pages.
94        for range in row_ranges {
95            let mut range = range.clone();
96
97            // Skip any offsets pages that are before the range
98            while offsets_offset + (cur_page_info.offsets_in_page) <= range.start {
99                trace!("Skipping null offset adjustment chunk {:?}", offsets_offset);
100                offsets_offset += cur_page_info.offsets_in_page;
101                items_offset += cur_page_info.num_items_referenced_by_page;
102                cur_page_info = page_infos_iter.next().unwrap();
103            }
104
105            // If the range starts at the beginning of an offsets page we don't need
106            // to read an extra offset
107            let mut includes_extra_offset = range.start != offsets_offset;
108            if includes_extra_offset {
109                offsets_requests.push(range.start - 1..range.end);
110            } else {
111                offsets_requests.push(range.clone());
112            }
113
114            // At this point our range overlaps the current page (cur_page_info) and
115            // we can start slicing it into list requests
116            while !range.is_empty() {
117                // The end of the list request is the min of the end of the range
118                // and the end of the current page
119                let end = offsets_offset + cur_page_info.offsets_in_page;
120                let last = end >= range.end;
121                let end = end.min(range.end);
122                list_requests.push_back(ListRequest {
123                    num_lists: end - range.start,
124                    includes_extra_offset,
125                    null_offset_adjustment: cur_page_info.null_offset_adjustment,
126                    items_offset,
127                });
128
129                includes_extra_offset = false;
130                range.start = end;
131                // If there is still more data in the range, we need to move to the
132                // next page
133                if !last {
134                    offsets_offset += cur_page_info.offsets_in_page;
135                    items_offset += cur_page_info.num_items_referenced_by_page;
136                    cur_page_info = page_infos_iter.next().unwrap();
137                }
138            }
139        }
140        Self {
141            list_requests,
142            offsets_requests,
143        }
144    }
145
146    // Given a page of offset data, grab the corresponding list requests
147    fn next(&mut self, mut num_offsets: u64) -> Vec<ListRequest> {
148        let mut list_requests = Vec::new();
149        while num_offsets > 0 {
150            let req = self.list_requests.front_mut().unwrap();
151            // If the request did not start at zero then we need to read an extra offset
152            if req.includes_extra_offset {
153                num_offsets -= 1;
154                debug_assert_ne!(num_offsets, 0);
155            }
156            if num_offsets >= req.num_lists {
157                num_offsets -= req.num_lists;
158                list_requests.push(self.list_requests.pop_front().unwrap());
159            } else {
160                let sub_req = ListRequest {
161                    num_lists: num_offsets,
162                    includes_extra_offset: req.includes_extra_offset,
163                    null_offset_adjustment: req.null_offset_adjustment,
164                    items_offset: req.items_offset,
165                };
166
167                list_requests.push(sub_req);
168                req.includes_extra_offset = false;
169                req.num_lists -= num_offsets;
170                num_offsets = 0;
171            }
172        }
173        list_requests
174    }
175}
176
177/// Given a list of offsets and a list of requested list row ranges we need to rewrite the offsets so that
178/// they appear as expected for a list array.  This involves a number of tasks:
179///
180///  * Nulls in the offsets are represented by oversize values and these need to be converted to
181///    the appropriate length
182///  * For each range we (usually) load N + 1 offsets, so if we have 5 ranges we have 5 extra values
183///    and we need to drop 4 of those.
184///  * Ranges may not start at 0 and, while we don't strictly need to, we want to go ahead and normalize
185///    the offsets so that the first offset is 0.
186///
187/// Throughout the comments we will consider the following example case:
188///
189/// The user requests the following ranges of lists (list_row_ranges): [0..3, 5..6]
190///
191/// This is a total of 4 lists.  The loaded offsets are [10, 20, 120, 150, 60].  The last valid offset is 99.
192/// The null_offset_adjustment will be 100.
193///
194/// Our desired output offsets are going to be [0, 10, 20, 20, 30] and the item ranges are [0..20] and [50..60]
195/// The validity array is [true, true, false, true]
196fn decode_offsets(
197    offsets: &dyn Array,
198    list_requests: &[ListRequest],
199    null_offset_adjustment: u64,
200) -> (VecDeque<Range<u64>>, Vec<u64>, BooleanBuffer) {
201    // In our example this is [10, 20, 120, 50, 60]
202    let numeric_offsets = offsets.as_primitive::<UInt64Type>();
203    // In our example there are 4 total lists
204    let total_num_lists = list_requests.iter().map(|req| req.num_lists).sum::<u64>() as u32;
205    let mut normalized_offsets = Vec::with_capacity(total_num_lists as usize);
206    let mut validity_buffer = BooleanBufferBuilder::new(total_num_lists as usize);
207    // The first output offset is always 0 no matter what
208    normalized_offsets.push(0);
209    let mut last_normalized_offset = 0;
210    let offsets_values = numeric_offsets.values();
211
212    let mut item_ranges = VecDeque::new();
213    let mut offsets_offset: u32 = 0;
214    // All ranges should be non-empty
215    debug_assert!(list_requests.iter().all(|r| r.num_lists > 0));
216    for req in list_requests {
217        // The # of lists in this particular range
218        let num_lists = req.num_lists;
219
220        // Because we know the first offset is always 0 we don't store that.  This means we have special
221        // logic if a range starts at 0 (we didn't need to read an extra offset value in that case)
222        // In our example we enter this special case on the first range (0..3) but not the second (5..6)
223        // This means the first range, which has 3 lists, maps to 3 values in our offsets array [10, 20, 120]
224        // However, the second range, which has 1 list, maps to 2 values in our offsets array [150, 60]
225        let (items_range, offsets_to_norm_start, num_offsets_to_norm) =
226            if !req.includes_extra_offset {
227                // In our example items start is 0 and items_end is 20
228                let first_offset_idx = 0_usize;
229                let num_offsets = num_lists as usize;
230                let items_start = 0;
231                let items_end = offsets_values[num_offsets - 1] % null_offset_adjustment;
232                let items_range = items_start..items_end;
233                (items_range, first_offset_idx, num_offsets)
234            } else {
235                // In our example, offsets_offset will be 3, items_start will be 50, and items_end will
236                // be 60
237                let first_offset_idx = offsets_offset as usize;
238                let num_offsets = num_lists as usize + 1;
239                let items_start = offsets_values[first_offset_idx] % null_offset_adjustment;
240                let items_end =
241                    offsets_values[first_offset_idx + num_offsets - 1] % null_offset_adjustment;
242                let items_range = items_start..items_end;
243                (items_range, first_offset_idx, num_offsets)
244            };
245
246        // TODO: Maybe consider writing whether there are nulls or not as part of the
247        // page description.  Then we can skip all validity work.  Not clear if that will
248        // be any benefit though.
249
250        // We calculate validity from all elements but the first (or all elements
251        // if this is the special zero-start case)
252        //
253        // So, in our first pass through, we consider [10, 20, 120] (1 null)
254        // In our second pass through we only consider [60] (0 nulls)
255        // Note that the 150 is null but we only loaded it to know where the 50-60 list started
256        // and it doesn't actually correspond to a list (e.g. list 4 is null but we aren't loading it
257        // here)
258        let validity_start = if !req.includes_extra_offset {
259            0
260        } else {
261            offsets_to_norm_start + 1
262        };
263        for off in offsets_values
264            .slice(validity_start, num_lists as usize)
265            .iter()
266        {
267            validity_buffer.append(*off < null_offset_adjustment);
268        }
269
270        // In our special case we need to account for the offset 0-first_item
271        if !req.includes_extra_offset {
272            let first_item = offsets_values[0] % null_offset_adjustment;
273            normalized_offsets.push(first_item);
274            last_normalized_offset = first_item;
275        }
276
277        // Finally, we go through and shift the offsets.  If we just returned them as is (taking care of
278        // nulls) we would get [0, 10, 20, 20, 60] but our last list only has 10 items, not 40 and so we
279        // need to shift that 60 to a 40.
280        normalized_offsets.extend(
281                offsets_values
282                    .slice(offsets_to_norm_start, num_offsets_to_norm)
283                    .windows(2)
284                    .map(|w| {
285                        let start = w[0] % null_offset_adjustment;
286                        let end = w[1] % null_offset_adjustment;
287                        if end < start {
288                            panic!("End is less than start in window {:?} with null_offset_adjustment={} we get start={} and end={}", w, null_offset_adjustment, start, end);
289                        }
290                        let length = end - start;
291                        last_normalized_offset += length;
292                        last_normalized_offset
293                    }),
294            );
295        trace!(
296            "List offsets range of {} lists maps to item range {:?}",
297            num_lists, items_range
298        );
299        offsets_offset += num_offsets_to_norm as u32;
300        if !items_range.is_empty() {
301            let items_range =
302                items_range.start + req.items_offset..items_range.end + req.items_offset;
303            item_ranges.push_back(items_range);
304        }
305    }
306
307    let validity = validity_buffer.finish();
308    (item_ranges, normalized_offsets, validity)
309}
310
311/// After scheduling the offsets we immediately launch this task as a new tokio task
312/// This task waits for the offsets to arrive, decodes them, and then schedules the I/O
313/// for the items.
314///
315/// This task does not wait for the items data.  That happens on the main decode loop (unless
316/// we have list of list of ... in which case it happens in the outer indirect decode loop)
317#[allow(clippy::too_many_arguments)]
318async fn indirect_schedule_task(
319    mut offsets_decoder: Box<dyn LogicalPageDecoder>,
320    list_requests: Vec<ListRequest>,
321    null_offset_adjustment: u64,
322    items_scheduler: Arc<dyn FieldScheduler>,
323    items_type: DataType,
324    io: Arc<dyn EncodingsIo>,
325    cache: Arc<LanceCache>,
326    priority: Box<dyn PriorityRange>,
327) -> Result<IndirectlyLoaded> {
328    let num_offsets = offsets_decoder.num_rows();
329    // We know the offsets are a primitive array and thus will not need additional
330    // pages.  We can use a dummy receiver to match the decoder API
331    offsets_decoder.wait_for_loaded(num_offsets - 1).await?;
332    let decode_task = offsets_decoder.drain(num_offsets)?;
333    let offsets = decode_task.task.decode()?;
334
335    let (item_ranges, offsets, validity) =
336        decode_offsets(offsets.as_ref(), &list_requests, null_offset_adjustment);
337
338    trace!(
339        "Indirectly scheduling items ranges {:?} from list items column with {} rows (and priority {:?})",
340        item_ranges,
341        items_scheduler.num_rows(),
342        priority
343    );
344    let offsets: Arc<[u64]> = offsets.into();
345
346    // All requested lists are empty
347    if item_ranges.is_empty() {
348        debug_assert!(item_ranges.iter().all(|r| r.start == r.end));
349        return Ok(IndirectlyLoaded {
350            root_decoder: None,
351            offsets,
352            validity,
353        });
354    }
355    let item_ranges = item_ranges.into_iter().collect::<Vec<_>>();
356    let num_items = item_ranges.iter().map(|r| r.end - r.start).sum::<u64>();
357
358    // Create a new root scheduler, which has one column, which is our items data
359    let root_fields = Fields::from(vec![Field::new("item", items_type, true)]);
360    let indirect_root_scheduler =
361        SimpleStructScheduler::new(vec![items_scheduler], root_fields.clone(), num_items);
362    #[allow(deprecated)]
363    let mut indirect_scheduler = DecodeBatchScheduler::from_scheduler(
364        Arc::new(indirect_root_scheduler),
365        root_fields.clone(),
366        cache,
367    );
368    let mut root_decoder = SimpleStructDecoder::new(root_fields, num_items);
369
370    let priority = Box::new(ListPriorityRange::new(priority, offsets.clone()));
371
372    let indirect_messages = indirect_scheduler.schedule_ranges_to_vec(
373        &item_ranges,
374        // Can't push filters into list items
375        &FilterExpression::no_filter(),
376        io,
377        Some(priority),
378    )?;
379
380    for message in indirect_messages {
381        for decoder in message.decoders {
382            let decoder = decoder.into_legacy();
383            if !decoder.path.is_empty() {
384                root_decoder.accept_child(decoder)?;
385            }
386        }
387    }
388
389    Ok(IndirectlyLoaded {
390        offsets,
391        validity,
392        root_decoder: Some(root_decoder),
393    })
394}
395
396#[derive(Debug)]
397struct ListFieldSchedulingJob<'a> {
398    scheduler: &'a ListFieldScheduler,
399    offsets: Box<dyn SchedulingJob + 'a>,
400    num_rows: u64,
401    list_requests_iter: ListRequestsIter,
402}
403
404impl<'a> ListFieldSchedulingJob<'a> {
405    fn try_new(
406        scheduler: &'a ListFieldScheduler,
407        ranges: &[Range<u64>],
408        filter: &FilterExpression,
409    ) -> Result<Self> {
410        let list_requests_iter = ListRequestsIter::new(ranges, &scheduler.offset_page_info);
411        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
412        let offsets = scheduler
413            .offsets_scheduler
414            .schedule_ranges(&list_requests_iter.offsets_requests, filter)?;
415        Ok(Self {
416            scheduler,
417            offsets,
418            list_requests_iter,
419            num_rows,
420        })
421    }
422}
423
424impl SchedulingJob for ListFieldSchedulingJob<'_> {
425    fn schedule_next(
426        &mut self,
427        context: &mut SchedulerContext,
428        priority: &dyn PriorityRange,
429    ) -> Result<ScheduledScanLine> {
430        let next_offsets = self.offsets.schedule_next(context, priority)?;
431        let offsets_scheduled = next_offsets.rows_scheduled;
432        let list_reqs = self.list_requests_iter.next(offsets_scheduled);
433        trace!(
434            "Scheduled {} offsets which maps to list requests: {:?}",
435            offsets_scheduled, list_reqs
436        );
437        let null_offset_adjustment = list_reqs[0].null_offset_adjustment;
438        // It shouldn't be possible for `list_reqs` to span more than one offsets page and so it shouldn't
439        // be possible for the null_offset_adjustment to change
440        debug_assert!(
441            list_reqs
442                .iter()
443                .all(|req| req.null_offset_adjustment == null_offset_adjustment)
444        );
445        let num_rows = list_reqs.iter().map(|req| req.num_lists).sum::<u64>();
446        // offsets is a uint64 which is guaranteed to create one decoder on each call to schedule_next
447        let next_offsets_decoder = next_offsets
448            .decoders
449            .into_iter()
450            .next()
451            .unwrap()
452            .into_legacy()
453            .decoder;
454
455        let items_scheduler = self.scheduler.items_scheduler.clone();
456        let items_type = self.scheduler.items_field.data_type().clone();
457        let io = context.io().clone();
458        let cache = context.cache().clone();
459
460        // Immediately spawn the indirect scheduling
461        let indirect_fut = tokio::spawn(indirect_schedule_task(
462            next_offsets_decoder,
463            list_reqs,
464            null_offset_adjustment,
465            items_scheduler,
466            items_type,
467            io,
468            cache,
469            priority.box_clone(),
470        ));
471
472        // Return a decoder
473        let decoder = Box::new(ListPageDecoder {
474            offsets: Arc::new([]),
475            validity: BooleanBuffer::new(Buffer::from_vec(Vec::<u8>::default()), 0, 0),
476            item_decoder: None,
477            rows_drained: 0,
478            rows_loaded: 0,
479            items_field: self.scheduler.items_field.clone(),
480            num_rows,
481            unloaded: Some(indirect_fut),
482            offset_type: self.scheduler.offset_type.clone(),
483            data_type: self.scheduler.list_type.clone(),
484        });
485        #[allow(deprecated)]
486        let decoder = context.locate_decoder(decoder);
487        Ok(ScheduledScanLine {
488            decoders: vec![MessageType::DecoderReady(decoder)],
489            rows_scheduled: num_rows,
490        })
491    }
492
493    fn num_rows(&self) -> u64 {
494        self.num_rows
495    }
496}
497
498/// A page scheduler for list fields that encodes offsets in one field and items in another
499///
500/// The list scheduler is somewhat unique because it requires indirect I/O.  We cannot know the
501/// ranges we need simply by looking at the metadata.  This means that list scheduling doesn't
502/// fit neatly into the two-thread schedule-loop / decode-loop model.  To handle this, when a
503/// list page is scheduled, we only schedule the I/O for the offsets and then we immediately
504/// launch a new tokio task.  This new task waits for the offsets, decodes them, and then
505/// schedules the I/O for the items.  Keep in mind that list items can be lists themselves.  If
506/// that is the case then this indirection will continue.  The decode task that is returned will
507/// only finish `wait`ing when all of the I/O has completed.
508///
509/// Whenever we schedule follow-up I/O like this the priority is based on the top-level row
510/// index.  This helps ensure that earlier rows get finished completely (including follow up
511/// tasks) before we perform I/O for later rows.
512#[derive(Debug)]
513pub struct ListFieldScheduler {
514    offsets_scheduler: Arc<dyn FieldScheduler>,
515    items_scheduler: Arc<dyn FieldScheduler>,
516    items_field: Arc<Field>,
517    offset_type: DataType,
518    list_type: DataType,
519    offset_page_info: Vec<OffsetPageInfo>,
520}
521
522/// The offsets are stored in a uint64 encoded column.  For each page we
523/// store some supplementary data that helps us understand the offsets.
524/// This is needed to construct the scheduler
525#[derive(Debug)]
526pub struct OffsetPageInfo {
527    pub offsets_in_page: u64,
528    pub null_offset_adjustment: u64,
529    pub num_items_referenced_by_page: u64,
530}
531
532impl ListFieldScheduler {
533    // Create a new ListPageScheduler
534    pub fn new(
535        offsets_scheduler: Arc<dyn FieldScheduler>,
536        items_scheduler: Arc<dyn FieldScheduler>,
537        items_field: Arc<Field>,
538        // Should be int32 or int64
539        offset_type: DataType,
540        offset_page_info: Vec<OffsetPageInfo>,
541    ) -> Self {
542        let list_type = match &offset_type {
543            DataType::Int32 => DataType::List(items_field.clone()),
544            DataType::Int64 => DataType::LargeList(items_field.clone()),
545            _ => panic!("Unexpected offset type {}", offset_type),
546        };
547        Self {
548            offsets_scheduler,
549            items_scheduler,
550            items_field,
551            offset_type,
552            offset_page_info,
553            list_type,
554        }
555    }
556}
557
558impl FieldScheduler for ListFieldScheduler {
559    fn schedule_ranges<'a>(
560        &'a self,
561        ranges: &[Range<u64>],
562        filter: &FilterExpression,
563    ) -> Result<Box<dyn SchedulingJob + 'a>> {
564        Ok(Box::new(ListFieldSchedulingJob::try_new(
565            self, ranges, filter,
566        )?))
567    }
568
569    fn num_rows(&self) -> u64 {
570        self.offsets_scheduler.num_rows()
571    }
572
573    fn initialize<'a>(
574        &'a self,
575        _filter: &'a FilterExpression,
576        _context: &'a SchedulerContext,
577    ) -> BoxFuture<'a, Result<()>> {
578        // 2.0 schedulers do not need to initialize
579        std::future::ready(Ok(())).boxed()
580    }
581}
582
583/// As soon as the first call to decode comes in we wait for all indirect I/O to
584/// complete.
585///
586/// Once the indirect I/O is finished we pull items out of `unawaited`, wait them
587/// (this wait should return immediately) and then push them into `item_decoders`.
588///
589/// We then drain from `item_decoders`, popping item pages off as we finish with
590/// them.
591///
592/// TODO: Test the case where a single list page has multiple items pages
593#[derive(Debug)]
594struct ListPageDecoder {
595    unloaded: Option<JoinHandle<Result<IndirectlyLoaded>>>,
596    // offsets and validity will have already been decoded as part of the indirect I/O
597    offsets: Arc<[u64]>,
598    validity: BooleanBuffer,
599    item_decoder: Option<SimpleStructDecoder>,
600    num_rows: u64,
601    rows_drained: u64,
602    rows_loaded: u64,
603    items_field: Arc<Field>,
604    offset_type: DataType,
605    data_type: DataType,
606}
607
608struct ListDecodeTask {
609    offsets: Vec<u64>,
610    validity: BooleanBuffer,
611    // Will be None if there are no items (all empty / null lists)
612    items: Option<Box<dyn DecodeArrayTask>>,
613    items_field: Arc<Field>,
614    offset_type: DataType,
615}
616
617impl DecodeArrayTask for ListDecodeTask {
618    fn decode(self: Box<Self>) -> Result<ArrayRef> {
619        let items = self
620            .items
621            .map(|items| {
622                // When we run the indirect I/O we wrap things in a struct array with a single field
623                // named "item".  We can unwrap that now.
624                let wrapped_items = items.decode()?;
625                Result::Ok(wrapped_items.as_struct().column(0).clone())
626            })
627            .unwrap_or_else(|| Ok(new_empty_array(self.items_field.data_type())))?;
628
629        // The offsets are already decoded but they need to be shifted back to 0 and cast
630        // to the appropriate type
631        //
632        // Although, in some cases, the shift IS strictly required since the unshifted offsets
633        // may cross i32::MAX even though the shifted offsets do not
634        let offsets = UInt64Array::from(self.offsets);
635        let validity = NullBuffer::new(self.validity);
636        let validity = if validity.null_count() == 0 {
637            None
638        } else {
639            Some(validity)
640        };
641        let min_offset = UInt64Array::new_scalar(offsets.value(0));
642        let offsets = arrow_arith::numeric::sub(&offsets, &min_offset)?;
643        match &self.offset_type {
644            DataType::Int32 => {
645                let offsets = arrow_cast::cast(&offsets, &DataType::Int32)?;
646                let offsets_i32 = offsets.as_primitive::<Int32Type>();
647                let offsets = OffsetBuffer::new(offsets_i32.values().clone());
648
649                Ok(Arc::new(ListArray::try_new(
650                    self.items_field.clone(),
651                    offsets,
652                    items,
653                    validity,
654                )?))
655            }
656            DataType::Int64 => {
657                let offsets = arrow_cast::cast(&offsets, &DataType::Int64)?;
658                let offsets_i64 = offsets.as_primitive::<Int64Type>();
659                let offsets = OffsetBuffer::new(offsets_i64.values().clone());
660
661                Ok(Arc::new(LargeListArray::try_new(
662                    self.items_field.clone(),
663                    offsets,
664                    items,
665                    validity,
666                )?))
667            }
668            _ => panic!("ListDecodeTask with data type that is not i32 or i64"),
669        }
670    }
671}
672
673// Helper method that performs binary search.  However, once the
674// target is found it walks past any duplicates.  E.g. if the
675// input list is [0, 3, 5, 5, 5, 7] then this will only return
676// 0, 1, 4, or 5.
677fn binary_search_to_end(to_search: &[u64], target: u64) -> u64 {
678    let mut result = match to_search.binary_search(&target) {
679        Ok(idx) => idx,
680        Err(idx) => idx - 1,
681    };
682    while result < (to_search.len() - 1) && to_search[result + 1] == target {
683        result += 1;
684    }
685    result as u64
686}
687
688impl LogicalPageDecoder for ListPageDecoder {
689    fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<'_, Result<()>> {
690        async move {
691            // wait for the indirect I/O to finish, run the scheduler for the indirect
692            // I/O and then wait for enough items to arrive
693            if self.unloaded.is_some() {
694                trace!("List scheduler needs to wait for indirect I/O to complete");
695                let indirectly_loaded = self.unloaded.take().unwrap().await;
696                if let Err(err) = indirectly_loaded {
697                    match err.try_into_panic() {
698                        Ok(err) => std::panic::resume_unwind(err),
699                        Err(err) => panic!("{:?}", err),
700                    };
701                }
702                let indirectly_loaded = indirectly_loaded.unwrap()?;
703
704                self.offsets = indirectly_loaded.offsets;
705                self.validity = indirectly_loaded.validity;
706                self.item_decoder = indirectly_loaded.root_decoder;
707            }
708            if self.rows_loaded > loaded_need {
709                return Ok(());
710            }
711
712            let boundary = loaded_need as usize;
713            debug_assert!(boundary < self.num_rows as usize);
714            // We need more than X lists which means we need at least X+1 lists which means
715            // we need at least offsets[X+1] items which means we need more than offsets[X+1]-1 items.
716            let items_needed = self.offsets[boundary + 1].saturating_sub(1);
717            trace!(
718                "List decoder is waiting for more than {} rows to be loaded and {}/{} are already loaded.  To satisfy this we need more than {} loaded items",
719                loaded_need,
720                self.rows_loaded,
721                self.num_rows,
722                items_needed,
723            );
724
725            let items_loaded = if let Some(item_decoder) = self.item_decoder.as_mut() {
726                item_decoder.wait_for_loaded(items_needed).await?;
727                item_decoder.rows_loaded()
728            } else {
729                0
730            };
731
732            self.rows_loaded = binary_search_to_end(&self.offsets, items_loaded);
733            trace!("List decoder now has {} loaded rows", self.rows_loaded);
734
735            Ok(())
736        }
737        .boxed()
738    }
739
740    fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
741        // We already have the offsets but need to drain the item pages
742        let mut actual_num_rows = num_rows;
743        let item_start = self.offsets[self.rows_drained as usize];
744        if self.offset_type != DataType::Int64 {
745            // We might not be able to drain `num_rows` because that request might contain more than 2^31 items
746            // so we need to figure out how many rows we can actually drain.
747            while actual_num_rows > 0 {
748                let num_items =
749                    self.offsets[(self.rows_drained + actual_num_rows) as usize] - item_start;
750                if num_items <= i32::MAX as u64 {
751                    break;
752                }
753                // TODO: This could be slow.  Maybe faster to start from zero or do binary search.  Investigate when
754                // actually adding support for smaller than requested batches
755                actual_num_rows -= 1;
756            }
757        }
758        if actual_num_rows < num_rows {
759            // TODO: We should be able to automatically
760            // shrink the read batch size if we detect the batches are going to be huge (maybe
761            // even achieve this with a read_batch_bytes parameter, though some estimation may
762            // still be required)
763            return Err(Error::not_supported_source(format!("loading a batch of {} lists would require creating an array with over i32::MAX items and we don't yet support returning smaller than requested batches", num_rows).into()));
764        }
765        let offsets = self.offsets
766            [self.rows_drained as usize..(self.rows_drained + actual_num_rows + 1) as usize]
767            .to_vec();
768        let validity = self
769            .validity
770            .slice(self.rows_drained as usize, actual_num_rows as usize);
771        let start = offsets[0];
772        let end = offsets[offsets.len() - 1];
773        let num_items_to_drain = end - start;
774
775        let item_decode = if num_items_to_drain == 0 {
776            None
777        } else {
778            self.item_decoder
779                .as_mut()
780                .map(|item_decoder| Result::Ok(item_decoder.drain(num_items_to_drain)?.task))
781                .transpose()?
782        };
783
784        self.rows_drained += num_rows;
785        Ok(NextDecodeTask {
786            num_rows,
787            task: Box::new(ListDecodeTask {
788                offsets,
789                validity,
790                items_field: self.items_field.clone(),
791                items: item_decode,
792                offset_type: self.offset_type.clone(),
793            }) as Box<dyn DecodeArrayTask>,
794        })
795    }
796
797    fn num_rows(&self) -> u64 {
798        self.num_rows
799    }
800
801    fn rows_loaded(&self) -> u64 {
802        self.rows_loaded
803    }
804
805    fn rows_drained(&self) -> u64 {
806        self.rows_drained
807    }
808
809    fn data_type(&self) -> &DataType {
810        &self.data_type
811    }
812}
813
814struct IndirectlyLoaded {
815    offsets: Arc<[u64]>,
816    validity: BooleanBuffer,
817    root_decoder: Option<SimpleStructDecoder>,
818}
819
820impl std::fmt::Debug for IndirectlyLoaded {
821    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
822        f.debug_struct("IndirectlyLoaded")
823            .field("offsets", &self.offsets)
824            .field("validity", &self.validity)
825            .finish()
826    }
827}
828
829/// An encoder for list offsets that "stitches" offsets and encodes nulls into the offsets
830///
831/// If we need to encode several list arrays into a single page then we need to "stitch" the offsets
832/// For example, imagine we have list arrays [[0, 1], [2]] and [[3, 4, 5]].
833///
834/// We will have offset arrays [0, 2, 3] and [0, 3].  We don't want to encode [0, 2, 3, 0, 3].  What
835/// we want is [0, 2, 3, 6]
836///
837/// This encoder also handles validity by converting a null value into an oversized offset.  For example,
838/// if we have four lists with offsets [0, 20, 20, 20, 30] and the list at index 2 is null (note that
839/// the list at index 1 is empty) then we turn this into offsets [0, 20, 20, 51, 30].  We replace a null
840/// offset with previous_offset + max_offset + 1.  This makes it possible to load a single item from the
841/// list array.
842///
843/// These offsets are always stored on disk as a u64 array.  First, this is because its simply much more
844/// likely than one expects that this is needed, even if our lists are not massive.  This is because we
845/// only write an offsets page when we have enough data.  This means we will probably accumulate a million
846/// offsets or more before we bother to write a page. If our lists have a few thousand items a piece then
847/// we end up passing the u32::MAX boundary.
848///
849/// The second reason is that list offsets are very easily compacted with delta + bit packing and so those
850/// u64 offsets should easily be shrunk down before being put on disk.
851///
852/// This encoder can encode both lists and large lists.  It can decode the resulting column into either type
853/// as well. (TODO: Test and enable large lists)
854///
855/// You can even write as a large list and decode as a regular list (as long as no single list has more than
856/// 2^31 items) or vice versa.  You could even encode a mixed stream of list and large list (but unclear that
857/// would ever be useful)
858#[derive(Debug)]
859struct ListOffsetsEncoder {
860    // An accumulation queue, we insert both offset arrays and validity arrays into this queue
861    accumulation_queue: AccumulationQueue,
862    // The inner encoder of offset values
863    inner_encoder: Arc<dyn ArrayEncoder>,
864    column_index: u32,
865}
866
867impl ListOffsetsEncoder {
868    fn new(
869        cache_bytes: u64,
870        keep_original_array: bool,
871        column_index: u32,
872        inner_encoder: Arc<dyn ArrayEncoder>,
873    ) -> Self {
874        Self {
875            accumulation_queue: AccumulationQueue::new(
876                cache_bytes,
877                column_index,
878                keep_original_array,
879            ),
880            inner_encoder,
881            column_index,
882        }
883    }
884
885    /// Given a list array, return the offsets as a standalone ArrayRef (either an Int32Array or Int64Array)
886    fn extract_offsets(list_arr: &dyn Array) -> ArrayRef {
887        match list_arr.data_type() {
888            DataType::List(_) => {
889                let offsets = list_arr.as_list::<i32>().offsets().clone();
890                Arc::new(Int32Array::new(offsets.into_inner(), None))
891            }
892            DataType::LargeList(_) => {
893                let offsets = list_arr.as_list::<i64>().offsets().clone();
894                Arc::new(Int64Array::new(offsets.into_inner(), None))
895            }
896            _ => panic!(),
897        }
898    }
899
900    /// Converts the validity of a list array into a boolean array.  If there is no validity information
901    /// then this is an empty boolean array.
902    fn extract_validity(list_arr: &dyn Array) -> ArrayRef {
903        if let Some(validity) = list_arr.nulls() {
904            Arc::new(BooleanArray::new(validity.inner().clone(), None))
905        } else {
906            // We convert None validity into an empty array because the accumulation queue can't
907            // handle Option<ArrayRef>
908            new_empty_array(&DataType::Boolean)
909        }
910    }
911
912    fn make_encode_task(&self, arrays: Vec<ArrayRef>) -> EncodeTask {
913        let inner_encoder = self.inner_encoder.clone();
914        let column_idx = self.column_index;
915        // At this point we should have 2*N arrays where the even-indexed arrays are integer offsets
916        // and the odd-indexed arrays are boolean validity bitmaps
917        let offset_arrays = arrays.iter().step_by(2).cloned().collect::<Vec<_>>();
918        let validity_arrays = arrays.into_iter().skip(1).step_by(2).collect::<Vec<_>>();
919
920        tokio::task::spawn(async move {
921            let num_rows =
922                offset_arrays.iter().map(|arr| arr.len()).sum::<usize>() - offset_arrays.len();
923            let num_rows = num_rows as u64;
924            let mut buffer_index = 0;
925            let array = Self::do_encode(
926                offset_arrays,
927                validity_arrays,
928                &mut buffer_index,
929                num_rows,
930                inner_encoder,
931            )?;
932            let (data, description) = array.into_buffers();
933            Ok(EncodedPage {
934                data,
935                description: PageEncoding::Legacy(description),
936                num_rows,
937                column_idx,
938                row_number: 0, // Legacy encoders do not use
939            })
940        })
941        .map(|res_res| res_res.unwrap())
942        .boxed()
943    }
944
945    fn maybe_encode_offsets_and_validity(&mut self, list_arr: &dyn Array) -> Option<EncodeTask> {
946        let offsets = Self::extract_offsets(list_arr);
947        let validity = Self::extract_validity(list_arr);
948        let num_rows = offsets.len() as u64;
949        // Either inserting the offsets OR inserting the validity could cause the
950        // accumulation queue to fill up
951        if let Some(mut arrays) = self
952            .accumulation_queue
953            .insert(offsets, /*row_number=*/ 0, num_rows)
954        {
955            arrays.0.push(validity);
956            Some(self.make_encode_task(arrays.0))
957        } else if let Some(arrays) = self
958            .accumulation_queue
959            .insert(validity, /*row_number=*/ 0, num_rows)
960        {
961            Some(self.make_encode_task(arrays.0))
962        } else {
963            None
964        }
965    }
966
967    fn flush(&mut self) -> Option<EncodeTask> {
968        if let Some(arrays) = self.accumulation_queue.flush() {
969            Some(self.make_encode_task(arrays.0))
970        } else {
971            None
972        }
973    }
974
975    // Get's the total number of items covered by an array of offsets (keeping in
976    // mind that the first offset may not be zero)
977    fn get_offset_span(array: &dyn Array) -> u64 {
978        match array.data_type() {
979            DataType::Int32 => {
980                let arr_i32 = array.as_primitive::<Int32Type>();
981                (arr_i32.value(arr_i32.len() - 1) - arr_i32.value(0)) as u64
982            }
983            DataType::Int64 => {
984                let arr_i64 = array.as_primitive::<Int64Type>();
985                (arr_i64.value(arr_i64.len() - 1) - arr_i64.value(0)) as u64
986            }
987            _ => panic!(),
988        }
989    }
990
991    // This is where we do the work to actually shift the offsets and encode nulls
992    // Note that the output is u64 and the input could be i32 OR i64.
993    fn extend_offsets_vec_u64(
994        dest: &mut Vec<u64>,
995        offsets: &dyn Array,
996        validity: Option<&BooleanArray>,
997        // The offset of this list into the destination
998        base: u64,
999        null_offset_adjustment: u64,
1000    ) {
1001        match offsets.data_type() {
1002            DataType::Int32 => {
1003                let offsets_i32 = offsets.as_primitive::<Int32Type>();
1004                let start = offsets_i32.value(0) as u64;
1005                // If we want to take a list from start..X and change it into
1006                // a list from end..X then we need to add (base - start) to all elements
1007                // Note that `modifier` may be negative but (item + modifier) will always be >= 0
1008                let modifier = base as i64 - start as i64;
1009                if let Some(validity) = validity {
1010                    dest.extend(
1011                        offsets_i32
1012                            .values()
1013                            .iter()
1014                            .skip(1)
1015                            .zip(validity.values().iter())
1016                            .map(|(&off, valid)| {
1017                                (off as i64 + modifier) as u64
1018                                    + (!valid as u64 * null_offset_adjustment)
1019                            }),
1020                    );
1021                } else {
1022                    dest.extend(
1023                        offsets_i32
1024                            .values()
1025                            .iter()
1026                            .skip(1)
1027                            // Subtract by `start` so offsets start at 0
1028                            .map(|&v| (v as i64 + modifier) as u64),
1029                    );
1030                }
1031            }
1032            DataType::Int64 => {
1033                let offsets_i64 = offsets.as_primitive::<Int64Type>();
1034                let start = offsets_i64.value(0) as u64;
1035                // If we want to take a list from start..X and change it into
1036                // a list from end..X then we need to add (base - start) to all elements
1037                // Note that `modifier` may be negative but (item + modifier) will always be >= 0
1038                let modifier = base as i64 - start as i64;
1039                if let Some(validity) = validity {
1040                    dest.extend(
1041                        offsets_i64
1042                            .values()
1043                            .iter()
1044                            .skip(1)
1045                            .zip(validity.values().iter())
1046                            .map(|(&off, valid)| {
1047                                (off + modifier) as u64 + (!valid as u64 * null_offset_adjustment)
1048                            }),
1049                    )
1050                } else {
1051                    dest.extend(
1052                        offsets_i64
1053                            .values()
1054                            .iter()
1055                            .skip(1)
1056                            .map(|&v| (v + modifier) as u64),
1057                    );
1058                }
1059            }
1060            _ => panic!("Invalid list offsets data type {:?}", offsets.data_type()),
1061        }
1062    }
1063
1064    fn do_encode_u64(
1065        offset_arrays: Vec<ArrayRef>,
1066        validity: Vec<Option<&BooleanArray>>,
1067        num_offsets: u64,
1068        null_offset_adjustment: u64,
1069        buffer_index: &mut u32,
1070        inner_encoder: Arc<dyn ArrayEncoder>,
1071    ) -> Result<EncodedArray> {
1072        let mut offsets = Vec::with_capacity(num_offsets as usize);
1073        for (offsets_arr, validity_arr) in offset_arrays.iter().zip(validity) {
1074            let last_prev_offset = offsets.last().copied().unwrap_or(0) % null_offset_adjustment;
1075            Self::extend_offsets_vec_u64(
1076                &mut offsets,
1077                &offsets_arr,
1078                validity_arr,
1079                last_prev_offset,
1080                null_offset_adjustment,
1081            );
1082        }
1083        let offsets_data = DataBlock::FixedWidth(FixedWidthDataBlock {
1084            bits_per_value: 64,
1085            data: LanceBuffer::reinterpret_vec(offsets),
1086            num_values: num_offsets,
1087            block_info: BlockInfo::new(),
1088        });
1089        inner_encoder.encode(offsets_data, &DataType::UInt64, buffer_index)
1090    }
1091
1092    fn do_encode(
1093        offset_arrays: Vec<ArrayRef>,
1094        validity_arrays: Vec<ArrayRef>,
1095        buffer_index: &mut u32,
1096        num_offsets: u64,
1097        inner_encoder: Arc<dyn ArrayEncoder>,
1098    ) -> Result<EncodedArray> {
1099        let validity_arrays = validity_arrays
1100            .iter()
1101            .map(|v| {
1102                if v.is_empty() {
1103                    None
1104                } else {
1105                    Some(v.as_boolean())
1106                }
1107            })
1108            .collect::<Vec<_>>();
1109        debug_assert_eq!(offset_arrays.len(), validity_arrays.len());
1110        let total_span = offset_arrays
1111            .iter()
1112            .map(|arr| Self::get_offset_span(arr.as_ref()))
1113            .sum::<u64>();
1114        // See encodings.proto for reasoning behind this value
1115        let null_offset_adjustment = total_span + 1;
1116        let encoded_offsets = Self::do_encode_u64(
1117            offset_arrays,
1118            validity_arrays,
1119            num_offsets,
1120            null_offset_adjustment,
1121            buffer_index,
1122            inner_encoder,
1123        )?;
1124        Ok(EncodedArray {
1125            data: encoded_offsets.data,
1126            encoding: pb::ArrayEncoding {
1127                array_encoding: Some(pb::array_encoding::ArrayEncoding::List(Box::new(
1128                    pb::List {
1129                        offsets: Some(Box::new(encoded_offsets.encoding)),
1130                        null_offset_adjustment,
1131                        num_items: total_span,
1132                    },
1133                ))),
1134            },
1135        })
1136    }
1137}
1138
1139pub struct ListFieldEncoder {
1140    offsets_encoder: ListOffsetsEncoder,
1141    items_encoder: Box<dyn FieldEncoder>,
1142}
1143
1144impl ListFieldEncoder {
1145    pub fn new(
1146        items_encoder: Box<dyn FieldEncoder>,
1147        inner_offsets_encoder: Arc<dyn ArrayEncoder>,
1148        cache_bytes_per_columns: u64,
1149        keep_original_array: bool,
1150        column_index: u32,
1151    ) -> Self {
1152        Self {
1153            offsets_encoder: ListOffsetsEncoder::new(
1154                cache_bytes_per_columns,
1155                keep_original_array,
1156                column_index,
1157                inner_offsets_encoder,
1158            ),
1159            items_encoder,
1160        }
1161    }
1162
1163    fn combine_tasks(
1164        offsets_tasks: Vec<EncodeTask>,
1165        item_tasks: Vec<EncodeTask>,
1166    ) -> Result<Vec<EncodeTask>> {
1167        let mut all_tasks = offsets_tasks;
1168        let item_tasks = item_tasks;
1169        all_tasks.extend(item_tasks);
1170        Ok(all_tasks)
1171    }
1172}
1173
1174impl FieldEncoder for ListFieldEncoder {
1175    fn maybe_encode(
1176        &mut self,
1177        array: ArrayRef,
1178        external_buffers: &mut OutOfLineBuffers,
1179        repdef: RepDefBuilder,
1180        row_number: u64,
1181        num_rows: u64,
1182    ) -> Result<Vec<EncodeTask>> {
1183        // The list may have an offset / shorter length which means the underlying
1184        // values array could be longer than what we need to encode and so we need
1185        // to slice down to the region of interest.
1186        let items = match array.data_type() {
1187            DataType::List(_) => {
1188                let list_arr = array.as_list::<i32>();
1189                let items_start = list_arr.value_offsets()[list_arr.offset()] as usize;
1190                let items_end =
1191                    list_arr.value_offsets()[list_arr.offset() + list_arr.len()] as usize;
1192                list_arr
1193                    .values()
1194                    .slice(items_start, items_end - items_start)
1195            }
1196            DataType::LargeList(_) => {
1197                let list_arr = array.as_list::<i64>();
1198                let items_start = list_arr.value_offsets()[list_arr.offset()] as usize;
1199                let items_end =
1200                    list_arr.value_offsets()[list_arr.offset() + list_arr.len()] as usize;
1201                list_arr
1202                    .values()
1203                    .slice(items_start, items_end - items_start)
1204            }
1205            _ => panic!(),
1206        };
1207        let offsets_tasks = self
1208            .offsets_encoder
1209            .maybe_encode_offsets_and_validity(array.as_ref())
1210            .map(|task| vec![task])
1211            .unwrap_or_default();
1212        let mut item_tasks = self.items_encoder.maybe_encode(
1213            items,
1214            external_buffers,
1215            repdef,
1216            row_number,
1217            num_rows,
1218        )?;
1219        if !offsets_tasks.is_empty() && item_tasks.is_empty() {
1220            // An items page cannot currently be shared by two different offsets pages.  This is
1221            // a limitation in the current scheduler and could be addressed in the future.  As a result
1222            // we always need to encode the items page if we encode the offsets page.
1223            //
1224            // In practice this isn't usually too bad unless we are targeting very small pages.
1225            item_tasks = self.items_encoder.flush(external_buffers)?;
1226        }
1227        Self::combine_tasks(offsets_tasks, item_tasks)
1228    }
1229
1230    fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
1231        let offsets_tasks = self
1232            .offsets_encoder
1233            .flush()
1234            .map(|task| vec![task])
1235            .unwrap_or_default();
1236        let item_tasks = self.items_encoder.flush(external_buffers)?;
1237        Self::combine_tasks(offsets_tasks, item_tasks)
1238    }
1239
1240    fn num_columns(&self) -> u32 {
1241        self.items_encoder.num_columns() + 1
1242    }
1243
1244    fn finish(
1245        &mut self,
1246        external_buffers: &mut OutOfLineBuffers,
1247    ) -> BoxFuture<'_, Result<Vec<EncodedColumn>>> {
1248        let inner_columns = self.items_encoder.finish(external_buffers);
1249        async move {
1250            let mut columns = vec![EncodedColumn::default()];
1251            let inner_columns = inner_columns.await?;
1252            columns.extend(inner_columns);
1253            Ok(columns)
1254        }
1255        .boxed()
1256    }
1257}