Skip to main content

lance_encoding/previous/encodings/logical/
list.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    collections::VecDeque,
6    ops::Range,
7    sync::{Arc, OnceLock},
8};
9
10use arrow_array::{
11    Array, ArrayRef, BooleanArray, Int32Array, Int64Array, LargeListArray, ListArray, UInt64Array,
12    cast::AsArray,
13    new_empty_array,
14    types::{Int32Type, Int64Type, UInt64Type},
15};
16use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, Buffer, NullBuffer, OffsetBuffer};
17use arrow_schema::{DataType, Field, Fields};
18use futures::{FutureExt, future::BoxFuture};
19use lance_core::{Error, Result, cache::LanceCache, utils::parse::str_is_truthy};
20use log::trace;
21use tokio::task::JoinHandle;
22
23use crate::{
24    EncodingsIo,
25    buffer::LanceBuffer,
26    data::{BlockInfo, DataBlock, FixedWidthDataBlock},
27    decoder::{
28        DecodeArrayTask, DecodeBatchScheduler, FilterExpression, ListPriorityRange, MessageType,
29        NextDecodeTask, PageEncoding, PriorityRange, ScheduledScanLine, SchedulerContext,
30    },
31    encoder::{EncodeTask, EncodedColumn, EncodedPage, FieldEncoder, OutOfLineBuffers},
32    format::pb,
33    previous::{
34        decoder::{FieldScheduler, LogicalPageDecoder, SchedulingJob},
35        encoder::{ArrayEncoder, EncodedArray},
36        encodings::logical::r#struct::{SimpleStructDecoder, SimpleStructScheduler},
37    },
38    repdef::RepDefBuilder,
39    utils::accumulation::AccumulationQueue,
40};
41
42/// When set, indirect I/O in the 2.0 list scheduler bypasses the backpressure system.
43///
44/// This can be a blunt instrument to avoid deadlocks in 2.0 scenarios
45/// Set LANCE_BYPASS_INDIRECT_IO_BACKPRESSURE=1 to enable.
46static BYPASS_INDIRECT_IO_BACKPRESSURE: OnceLock<bool> = OnceLock::new();
47
48fn bypass_indirect_io_backpressure() -> bool {
49    *BYPASS_INDIRECT_IO_BACKPRESSURE.get_or_init(|| {
50        std::env::var("LANCE_BYPASS_INDIRECT_IO_BACKPRESSURE")
51            .map(|val| str_is_truthy(&val))
52            .unwrap_or(false)
53    })
54}
55
56// Scheduling lists is tricky.  Imagine the following scenario:
57//
58// * There are 2000 offsets per offsets page
59// * The user requests range 8000..8500
60//
61// First, since 8000 matches the start of an offsets page, we don't need to read an extra offset.
62//
63// Since this range matches the start of a page, we know we will get an offsets array like
64// [0, ...]
65//
66// We need to restore nulls, which relies on a null offset adjustment, which is unique to each offsets
67// page.
68//
69// We need to map this to [X, ...] where X is the sum of the number of items in the 0-2000, 2000-4000,
70// and 4000-6000 pages.
71//
72// This gets even trickier if a range spans multiple offsets pages.  For example, given the same
73// scenario but the user requests 7999..8500.  In this case the first page read will include an
74// extra offset (e.g. we need to read 7998..8000), the null adjustment will be different between the
75// two, and the items offset will be different.
76//
77// To handle this, we take the incoming row requests, look at the page info, and then calculate
78// list requests.
79
80#[derive(Debug)]
81struct ListRequest {
82    /// How many lists this request maps to
83    num_lists: u64,
84    /// Did this request include an extra offset
85    includes_extra_offset: bool,
86    /// The null offset adjustment for this request
87    null_offset_adjustment: u64,
88    /// items offset to apply
89    items_offset: u64,
90}
91
92#[derive(Debug)]
93struct ListRequestsIter {
94    // The bool triggers whether we need to skip an offset or not
95    list_requests: VecDeque<ListRequest>,
96    offsets_requests: Vec<Range<u64>>,
97}
98
99impl ListRequestsIter {
100    // TODO: This logic relies on row_ranges being ordered and may be a problem when we
101    // add proper support for out-of-order take
102    fn new(row_ranges: &[Range<u64>], page_infos: &[OffsetPageInfo]) -> Self {
103        let mut items_offset = 0;
104        let mut offsets_offset = 0;
105        let mut page_infos_iter = page_infos.iter();
106        let mut cur_page_info = page_infos_iter.next().unwrap();
107        let mut list_requests = VecDeque::new();
108        let mut offsets_requests = Vec::new();
109
110        // Each row range maps to at least one list request.  It may map to more if the
111        // range spans multiple offsets pages.
112        for range in row_ranges {
113            let mut range = range.clone();
114
115            // Skip any offsets pages that are before the range
116            while offsets_offset + (cur_page_info.offsets_in_page) <= range.start {
117                trace!("Skipping null offset adjustment chunk {:?}", offsets_offset);
118                offsets_offset += cur_page_info.offsets_in_page;
119                items_offset += cur_page_info.num_items_referenced_by_page;
120                cur_page_info = page_infos_iter.next().unwrap();
121            }
122
123            // If the range starts at the beginning of an offsets page we don't need
124            // to read an extra offset
125            let mut includes_extra_offset = range.start != offsets_offset;
126            if includes_extra_offset {
127                offsets_requests.push(range.start - 1..range.end);
128            } else {
129                offsets_requests.push(range.clone());
130            }
131
132            // At this point our range overlaps the current page (cur_page_info) and
133            // we can start slicing it into list requests
134            while !range.is_empty() {
135                // The end of the list request is the min of the end of the range
136                // and the end of the current page
137                let end = offsets_offset + cur_page_info.offsets_in_page;
138                let last = end >= range.end;
139                let end = end.min(range.end);
140                list_requests.push_back(ListRequest {
141                    num_lists: end - range.start,
142                    includes_extra_offset,
143                    null_offset_adjustment: cur_page_info.null_offset_adjustment,
144                    items_offset,
145                });
146
147                includes_extra_offset = false;
148                range.start = end;
149                // If there is still more data in the range, we need to move to the
150                // next page
151                if !last {
152                    offsets_offset += cur_page_info.offsets_in_page;
153                    items_offset += cur_page_info.num_items_referenced_by_page;
154                    cur_page_info = page_infos_iter.next().unwrap();
155                }
156            }
157        }
158        Self {
159            list_requests,
160            offsets_requests,
161        }
162    }
163
164    // Given a page of offset data, grab the corresponding list requests
165    fn next(&mut self, mut num_offsets: u64) -> Vec<ListRequest> {
166        let mut list_requests = Vec::new();
167        while num_offsets > 0 {
168            let req = self.list_requests.front_mut().unwrap();
169            // If the request did not start at zero then we need to read an extra offset
170            if req.includes_extra_offset {
171                num_offsets -= 1;
172                debug_assert_ne!(num_offsets, 0);
173            }
174            if num_offsets >= req.num_lists {
175                num_offsets -= req.num_lists;
176                list_requests.push(self.list_requests.pop_front().unwrap());
177            } else {
178                let sub_req = ListRequest {
179                    num_lists: num_offsets,
180                    includes_extra_offset: req.includes_extra_offset,
181                    null_offset_adjustment: req.null_offset_adjustment,
182                    items_offset: req.items_offset,
183                };
184
185                list_requests.push(sub_req);
186                req.includes_extra_offset = false;
187                req.num_lists -= num_offsets;
188                num_offsets = 0;
189            }
190        }
191        list_requests
192    }
193}
194
195/// Given a list of offsets and a list of requested list row ranges we need to rewrite the offsets so that
196/// they appear as expected for a list array.  This involves a number of tasks:
197///
198///  * Nulls in the offsets are represented by oversize values and these need to be converted to
199///    the appropriate length
200///  * For each range we (usually) load N + 1 offsets, so if we have 5 ranges we have 5 extra values
201///    and we need to drop 4 of those.
202///  * Ranges may not start at 0 and, while we don't strictly need to, we want to go ahead and normalize
203///    the offsets so that the first offset is 0.
204///
205/// Throughout the comments we will consider the following example case:
206///
207/// The user requests the following ranges of lists (list_row_ranges): [0..3, 5..6]
208///
209/// This is a total of 4 lists.  The loaded offsets are [10, 20, 120, 150, 60].  The last valid offset is 99.
210/// The null_offset_adjustment will be 100.
211///
212/// Our desired output offsets are going to be [0, 10, 20, 20, 30] and the item ranges are [0..20] and [50..60]
213/// The validity array is [true, true, false, true]
214fn decode_offsets(
215    offsets: &dyn Array,
216    list_requests: &[ListRequest],
217    null_offset_adjustment: u64,
218) -> (VecDeque<Range<u64>>, Vec<u64>, BooleanBuffer) {
219    // In our example this is [10, 20, 120, 50, 60]
220    let numeric_offsets = offsets.as_primitive::<UInt64Type>();
221    // In our example there are 4 total lists
222    let total_num_lists = list_requests.iter().map(|req| req.num_lists).sum::<u64>() as u32;
223    let mut normalized_offsets = Vec::with_capacity(total_num_lists as usize);
224    let mut validity_buffer = BooleanBufferBuilder::new(total_num_lists as usize);
225    // The first output offset is always 0 no matter what
226    normalized_offsets.push(0);
227    let mut last_normalized_offset = 0;
228    let offsets_values = numeric_offsets.values();
229
230    let mut item_ranges = VecDeque::new();
231    let mut offsets_offset: u32 = 0;
232    // All ranges should be non-empty
233    debug_assert!(list_requests.iter().all(|r| r.num_lists > 0));
234    for req in list_requests {
235        // The # of lists in this particular range
236        let num_lists = req.num_lists;
237
238        // Because we know the first offset is always 0 we don't store that.  This means we have special
239        // logic if a range starts at 0 (we didn't need to read an extra offset value in that case)
240        // In our example we enter this special case on the first range (0..3) but not the second (5..6)
241        // This means the first range, which has 3 lists, maps to 3 values in our offsets array [10, 20, 120]
242        // However, the second range, which has 1 list, maps to 2 values in our offsets array [150, 60]
243        let (items_range, offsets_to_norm_start, num_offsets_to_norm) =
244            if !req.includes_extra_offset {
245                // In our example items start is 0 and items_end is 20
246                let first_offset_idx = 0_usize;
247                let num_offsets = num_lists as usize;
248                let items_start = 0;
249                let items_end = offsets_values[num_offsets - 1] % null_offset_adjustment;
250                let items_range = items_start..items_end;
251                (items_range, first_offset_idx, num_offsets)
252            } else {
253                // In our example, offsets_offset will be 3, items_start will be 50, and items_end will
254                // be 60
255                let first_offset_idx = offsets_offset as usize;
256                let num_offsets = num_lists as usize + 1;
257                let items_start = offsets_values[first_offset_idx] % null_offset_adjustment;
258                let items_end =
259                    offsets_values[first_offset_idx + num_offsets - 1] % null_offset_adjustment;
260                let items_range = items_start..items_end;
261                (items_range, first_offset_idx, num_offsets)
262            };
263
264        // TODO: Maybe consider writing whether there are nulls or not as part of the
265        // page description.  Then we can skip all validity work.  Not clear if that will
266        // be any benefit though.
267
268        // We calculate validity from all elements but the first (or all elements
269        // if this is the special zero-start case)
270        //
271        // So, in our first pass through, we consider [10, 20, 120] (1 null)
272        // In our second pass through we only consider [60] (0 nulls)
273        // Note that the 150 is null but we only loaded it to know where the 50-60 list started
274        // and it doesn't actually correspond to a list (e.g. list 4 is null but we aren't loading it
275        // here)
276        let validity_start = if !req.includes_extra_offset {
277            0
278        } else {
279            offsets_to_norm_start + 1
280        };
281        for off in offsets_values
282            .slice(validity_start, num_lists as usize)
283            .iter()
284        {
285            validity_buffer.append(*off < null_offset_adjustment);
286        }
287
288        // In our special case we need to account for the offset 0-first_item
289        if !req.includes_extra_offset {
290            let first_item = offsets_values[0] % null_offset_adjustment;
291            normalized_offsets.push(first_item);
292            last_normalized_offset = first_item;
293        }
294
295        // Finally, we go through and shift the offsets.  If we just returned them as is (taking care of
296        // nulls) we would get [0, 10, 20, 20, 60] but our last list only has 10 items, not 40 and so we
297        // need to shift that 60 to a 40.
298        normalized_offsets.extend(
299                offsets_values
300                    .slice(offsets_to_norm_start, num_offsets_to_norm)
301                    .windows(2)
302                    .map(|w| {
303                        let start = w[0] % null_offset_adjustment;
304                        let end = w[1] % null_offset_adjustment;
305                        if end < start {
306                            panic!("End is less than start in window {:?} with null_offset_adjustment={} we get start={} and end={}", w, null_offset_adjustment, start, end);
307                        }
308                        let length = end - start;
309                        last_normalized_offset += length;
310                        last_normalized_offset
311                    }),
312            );
313        trace!(
314            "List offsets range of {} lists maps to item range {:?}",
315            num_lists, items_range
316        );
317        offsets_offset += num_offsets_to_norm as u32;
318        if !items_range.is_empty() {
319            let items_range =
320                items_range.start + req.items_offset..items_range.end + req.items_offset;
321            item_ranges.push_back(items_range);
322        }
323    }
324
325    let validity = validity_buffer.finish();
326    (item_ranges, normalized_offsets, validity)
327}
328
329/// After scheduling the offsets we immediately launch this task as a new tokio task
330/// This task waits for the offsets to arrive, decodes them, and then schedules the I/O
331/// for the items.
332///
333/// This task does not wait for the items data.  That happens on the main decode loop (unless
334/// we have list of list of ... in which case it happens in the outer indirect decode loop)
335#[allow(clippy::too_many_arguments)]
336async fn indirect_schedule_task(
337    mut offsets_decoder: Box<dyn LogicalPageDecoder>,
338    list_requests: Vec<ListRequest>,
339    null_offset_adjustment: u64,
340    items_scheduler: Arc<dyn FieldScheduler>,
341    items_type: DataType,
342    io: Arc<dyn EncodingsIo>,
343    cache: Arc<LanceCache>,
344    priority: Box<dyn PriorityRange>,
345) -> Result<IndirectlyLoaded> {
346    let num_offsets = offsets_decoder.num_rows();
347    // We know the offsets are a primitive array and thus will not need additional
348    // pages.  We can use a dummy receiver to match the decoder API
349    offsets_decoder.wait_for_loaded(num_offsets - 1).await?;
350    let decode_task = offsets_decoder.drain(num_offsets)?;
351    let (offsets, _) = decode_task.task.decode()?;
352
353    let (item_ranges, offsets, validity) =
354        decode_offsets(offsets.as_ref(), &list_requests, null_offset_adjustment);
355
356    trace!(
357        "Indirectly scheduling items ranges {:?} from list items column with {} rows (and priority {:?})",
358        item_ranges,
359        items_scheduler.num_rows(),
360        priority
361    );
362    let offsets: Arc<[u64]> = offsets.into();
363
364    // All requested lists are empty
365    if item_ranges.is_empty() {
366        debug_assert!(item_ranges.iter().all(|r| r.start == r.end));
367        return Ok(IndirectlyLoaded {
368            root_decoder: None,
369            offsets,
370            validity,
371        });
372    }
373    let item_ranges = item_ranges.into_iter().collect::<Vec<_>>();
374    let num_items = item_ranges.iter().map(|r| r.end - r.start).sum::<u64>();
375
376    // Create a new root scheduler, which has one column, which is our items data
377    let root_fields = Fields::from(vec![Field::new("item", items_type, true)]);
378    let indirect_root_scheduler =
379        SimpleStructScheduler::new(vec![items_scheduler], root_fields.clone(), num_items);
380    #[allow(deprecated)]
381    let mut indirect_scheduler = DecodeBatchScheduler::from_scheduler(
382        Arc::new(indirect_root_scheduler),
383        root_fields.clone(),
384        cache,
385    );
386    let mut root_decoder = SimpleStructDecoder::new(root_fields, num_items);
387
388    let priority = Box::new(ListPriorityRange::new(priority, offsets.clone()));
389
390    let indirect_messages = indirect_scheduler.schedule_ranges_to_vec(
391        &item_ranges,
392        // Can't push filters into list items
393        &FilterExpression::no_filter(),
394        io,
395        Some(priority),
396    )?;
397
398    for message in indirect_messages {
399        for decoder in message.decoders {
400            let decoder = decoder.into_legacy();
401            if !decoder.path.is_empty() {
402                root_decoder.accept_child(decoder)?;
403            }
404        }
405    }
406
407    Ok(IndirectlyLoaded {
408        offsets,
409        validity,
410        root_decoder: Some(root_decoder),
411    })
412}
413
414#[derive(Debug)]
415struct ListFieldSchedulingJob<'a> {
416    scheduler: &'a ListFieldScheduler,
417    offsets: Box<dyn SchedulingJob + 'a>,
418    num_rows: u64,
419    list_requests_iter: ListRequestsIter,
420}
421
422impl<'a> ListFieldSchedulingJob<'a> {
423    fn try_new(
424        scheduler: &'a ListFieldScheduler,
425        ranges: &[Range<u64>],
426        filter: &FilterExpression,
427    ) -> Result<Self> {
428        let list_requests_iter = ListRequestsIter::new(ranges, &scheduler.offset_page_info);
429        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
430        let offsets = scheduler
431            .offsets_scheduler
432            .schedule_ranges(&list_requests_iter.offsets_requests, filter)?;
433        Ok(Self {
434            scheduler,
435            offsets,
436            list_requests_iter,
437            num_rows,
438        })
439    }
440}
441
442impl SchedulingJob for ListFieldSchedulingJob<'_> {
443    fn schedule_next(
444        &mut self,
445        context: &mut SchedulerContext,
446        priority: &dyn PriorityRange,
447    ) -> Result<ScheduledScanLine> {
448        let next_offsets = self.offsets.schedule_next(context, priority)?;
449        let offsets_scheduled = next_offsets.rows_scheduled;
450        let list_reqs = self.list_requests_iter.next(offsets_scheduled);
451        trace!(
452            "Scheduled {} offsets which maps to list requests: {:?}",
453            offsets_scheduled, list_reqs
454        );
455        let null_offset_adjustment = list_reqs[0].null_offset_adjustment;
456        // It shouldn't be possible for `list_reqs` to span more than one offsets page and so it shouldn't
457        // be possible for the null_offset_adjustment to change
458        debug_assert!(
459            list_reqs
460                .iter()
461                .all(|req| req.null_offset_adjustment == null_offset_adjustment)
462        );
463        let num_rows = list_reqs.iter().map(|req| req.num_lists).sum::<u64>();
464        // offsets is a uint64 which is guaranteed to create one decoder on each call to schedule_next
465        let next_offsets_decoder = next_offsets
466            .decoders
467            .into_iter()
468            .next()
469            .unwrap()
470            .into_legacy()
471            .decoder;
472
473        let items_scheduler = self.scheduler.items_scheduler.clone();
474        let items_type = self.scheduler.items_field.data_type().clone();
475        let base_io = context.io().clone();
476        let io = if bypass_indirect_io_backpressure() {
477            base_io.with_bypass_backpressure().unwrap_or(base_io)
478        } else {
479            base_io
480        };
481        let cache = context.cache().clone();
482
483        // Immediately spawn the indirect scheduling
484        let indirect_fut = tokio::spawn(indirect_schedule_task(
485            next_offsets_decoder,
486            list_reqs,
487            null_offset_adjustment,
488            items_scheduler,
489            items_type,
490            io,
491            cache,
492            priority.box_clone(),
493        ));
494
495        // Return a decoder
496        let decoder = Box::new(ListPageDecoder {
497            offsets: Arc::new([]),
498            validity: BooleanBuffer::new(Buffer::from_vec(Vec::<u8>::default()), 0, 0),
499            item_decoder: None,
500            rows_drained: 0,
501            rows_loaded: 0,
502            items_field: self.scheduler.items_field.clone(),
503            num_rows,
504            unloaded: Some(indirect_fut),
505            offset_type: self.scheduler.offset_type.clone(),
506            data_type: self.scheduler.list_type.clone(),
507        });
508        #[allow(deprecated)]
509        let decoder = context.locate_decoder(decoder);
510        Ok(ScheduledScanLine {
511            decoders: vec![MessageType::DecoderReady(decoder)],
512            rows_scheduled: num_rows,
513        })
514    }
515
516    fn num_rows(&self) -> u64 {
517        self.num_rows
518    }
519}
520
521/// A page scheduler for list fields that encodes offsets in one field and items in another
522///
523/// The list scheduler is somewhat unique because it requires indirect I/O.  We cannot know the
524/// ranges we need simply by looking at the metadata.  This means that list scheduling doesn't
525/// fit neatly into the two-thread schedule-loop / decode-loop model.  To handle this, when a
526/// list page is scheduled, we only schedule the I/O for the offsets and then we immediately
527/// launch a new tokio task.  This new task waits for the offsets, decodes them, and then
528/// schedules the I/O for the items.  Keep in mind that list items can be lists themselves.  If
529/// that is the case then this indirection will continue.  The decode task that is returned will
530/// only finish `wait`ing when all of the I/O has completed.
531///
532/// Whenever we schedule follow-up I/O like this the priority is based on the top-level row
533/// index.  This helps ensure that earlier rows get finished completely (including follow up
534/// tasks) before we perform I/O for later rows.
535#[derive(Debug)]
536pub struct ListFieldScheduler {
537    offsets_scheduler: Arc<dyn FieldScheduler>,
538    items_scheduler: Arc<dyn FieldScheduler>,
539    items_field: Arc<Field>,
540    offset_type: DataType,
541    list_type: DataType,
542    offset_page_info: Vec<OffsetPageInfo>,
543}
544
545/// The offsets are stored in a uint64 encoded column.  For each page we
546/// store some supplementary data that helps us understand the offsets.
547/// This is needed to construct the scheduler
548#[derive(Debug)]
549pub struct OffsetPageInfo {
550    pub offsets_in_page: u64,
551    pub null_offset_adjustment: u64,
552    pub num_items_referenced_by_page: u64,
553}
554
555impl ListFieldScheduler {
556    // Create a new ListPageScheduler
557    pub fn new(
558        offsets_scheduler: Arc<dyn FieldScheduler>,
559        items_scheduler: Arc<dyn FieldScheduler>,
560        items_field: Arc<Field>,
561        // Should be int32 or int64
562        offset_type: DataType,
563        offset_page_info: Vec<OffsetPageInfo>,
564    ) -> Self {
565        let list_type = match &offset_type {
566            DataType::Int32 => DataType::List(items_field.clone()),
567            DataType::Int64 => DataType::LargeList(items_field.clone()),
568            _ => panic!("Unexpected offset type {}", offset_type),
569        };
570        Self {
571            offsets_scheduler,
572            items_scheduler,
573            items_field,
574            offset_type,
575            offset_page_info,
576            list_type,
577        }
578    }
579}
580
581impl FieldScheduler for ListFieldScheduler {
582    fn schedule_ranges<'a>(
583        &'a self,
584        ranges: &[Range<u64>],
585        filter: &FilterExpression,
586    ) -> Result<Box<dyn SchedulingJob + 'a>> {
587        Ok(Box::new(ListFieldSchedulingJob::try_new(
588            self, ranges, filter,
589        )?))
590    }
591
592    fn num_rows(&self) -> u64 {
593        self.offsets_scheduler.num_rows()
594    }
595
596    fn initialize<'a>(
597        &'a self,
598        _filter: &'a FilterExpression,
599        _context: &'a SchedulerContext,
600    ) -> BoxFuture<'a, Result<()>> {
601        // 2.0 schedulers do not need to initialize
602        std::future::ready(Ok(())).boxed()
603    }
604}
605
606/// As soon as the first call to decode comes in we wait for all indirect I/O to
607/// complete.
608///
609/// Once the indirect I/O is finished we pull items out of `unawaited`, wait them
610/// (this wait should return immediately) and then push them into `item_decoders`.
611///
612/// We then drain from `item_decoders`, popping item pages off as we finish with
613/// them.
614///
615/// TODO: Test the case where a single list page has multiple items pages
616#[derive(Debug)]
617struct ListPageDecoder {
618    unloaded: Option<JoinHandle<Result<IndirectlyLoaded>>>,
619    // offsets and validity will have already been decoded as part of the indirect I/O
620    offsets: Arc<[u64]>,
621    validity: BooleanBuffer,
622    item_decoder: Option<SimpleStructDecoder>,
623    num_rows: u64,
624    rows_drained: u64,
625    rows_loaded: u64,
626    items_field: Arc<Field>,
627    offset_type: DataType,
628    data_type: DataType,
629}
630
631struct ListDecodeTask {
632    offsets: Vec<u64>,
633    validity: BooleanBuffer,
634    // Will be None if there are no items (all empty / null lists)
635    items: Option<Box<dyn DecodeArrayTask>>,
636    items_field: Arc<Field>,
637    offset_type: DataType,
638}
639
640impl DecodeArrayTask for ListDecodeTask {
641    fn decode(self: Box<Self>) -> Result<(ArrayRef, u64)> {
642        let items = self
643            .items
644            .map(|items| {
645                // When we run the indirect I/O we wrap things in a struct array with a single field
646                // named "item".  We can unwrap that now.
647                let (wrapped_items, _) = items.decode()?;
648                Result::Ok(wrapped_items.as_struct().column(0).clone())
649            })
650            .unwrap_or_else(|| Ok(new_empty_array(self.items_field.data_type())))?;
651
652        // The offsets are already decoded but they need to be shifted back to 0 and cast
653        // to the appropriate type
654        //
655        // Although, in some cases, the shift IS strictly required since the unshifted offsets
656        // may cross i32::MAX even though the shifted offsets do not
657        let offsets = UInt64Array::from(self.offsets);
658        let validity = NullBuffer::new(self.validity);
659        let validity = if validity.null_count() == 0 {
660            None
661        } else {
662            Some(validity)
663        };
664        let min_offset = UInt64Array::new_scalar(offsets.value(0));
665        let offsets = arrow_arith::numeric::sub(&offsets, &min_offset)?;
666        let array: ArrayRef = match &self.offset_type {
667            DataType::Int32 => {
668                let offsets = arrow_cast::cast(&offsets, &DataType::Int32)?;
669                let offsets_i32 = offsets.as_primitive::<Int32Type>();
670                let offsets = OffsetBuffer::new(offsets_i32.values().clone());
671
672                Arc::new(ListArray::try_new(
673                    self.items_field.clone(),
674                    offsets,
675                    items,
676                    validity,
677                )?)
678            }
679            DataType::Int64 => {
680                let offsets = arrow_cast::cast(&offsets, &DataType::Int64)?;
681                let offsets_i64 = offsets.as_primitive::<Int64Type>();
682                let offsets = OffsetBuffer::new(offsets_i64.values().clone());
683
684                Arc::new(LargeListArray::try_new(
685                    self.items_field.clone(),
686                    offsets,
687                    items,
688                    validity,
689                )?)
690            }
691            _ => panic!("ListDecodeTask with data type that is not i32 or i64"),
692        };
693        // data_size is only tracked in the v2.1 structural decode path; the legacy
694        // v2.0 path does not need it so we return 0.
695        Ok((array, 0))
696    }
697}
698
699// Helper method that performs binary search.  However, once the
700// target is found it walks past any duplicates.  E.g. if the
701// input list is [0, 3, 5, 5, 5, 7] then this will only return
702// 0, 1, 4, or 5.
703fn binary_search_to_end(to_search: &[u64], target: u64) -> u64 {
704    let mut result = match to_search.binary_search(&target) {
705        Ok(idx) => idx,
706        Err(idx) => idx - 1,
707    };
708    while result < (to_search.len() - 1) && to_search[result + 1] == target {
709        result += 1;
710    }
711    result as u64
712}
713
714impl LogicalPageDecoder for ListPageDecoder {
715    fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<'_, Result<()>> {
716        async move {
717            // wait for the indirect I/O to finish, run the scheduler for the indirect
718            // I/O and then wait for enough items to arrive
719            if self.unloaded.is_some() {
720                trace!("List scheduler needs to wait for indirect I/O to complete");
721                let indirectly_loaded = self.unloaded.take().unwrap().await;
722                if let Err(err) = indirectly_loaded {
723                    match err.try_into_panic() {
724                        Ok(err) => std::panic::resume_unwind(err),
725                        Err(err) => panic!("{:?}", err),
726                    };
727                }
728                let indirectly_loaded = indirectly_loaded.unwrap()?;
729
730                self.offsets = indirectly_loaded.offsets;
731                self.validity = indirectly_loaded.validity;
732                self.item_decoder = indirectly_loaded.root_decoder;
733            }
734            if self.rows_loaded > loaded_need {
735                return Ok(());
736            }
737
738            let boundary = loaded_need as usize;
739            debug_assert!(boundary < self.num_rows as usize);
740            // We need more than X lists which means we need at least X+1 lists which means
741            // we need at least offsets[X+1] items which means we need more than offsets[X+1]-1 items.
742            let items_needed = self.offsets[boundary + 1].saturating_sub(1);
743            trace!(
744                "List decoder is waiting for more than {} rows to be loaded and {}/{} are already loaded.  To satisfy this we need more than {} loaded items",
745                loaded_need,
746                self.rows_loaded,
747                self.num_rows,
748                items_needed,
749            );
750
751            let items_loaded = if let Some(item_decoder) = self.item_decoder.as_mut() {
752                item_decoder.wait_for_loaded(items_needed).await?;
753                item_decoder.rows_loaded()
754            } else {
755                0
756            };
757
758            self.rows_loaded = binary_search_to_end(&self.offsets, items_loaded);
759            trace!("List decoder now has {} loaded rows", self.rows_loaded);
760
761            Ok(())
762        }
763        .boxed()
764    }
765
766    fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
767        // We already have the offsets but need to drain the item pages
768        let mut actual_num_rows = num_rows;
769        let item_start = self.offsets[self.rows_drained as usize];
770        if self.offset_type != DataType::Int64 {
771            // We might not be able to drain `num_rows` because that request might contain more than 2^31 items
772            // so we need to figure out how many rows we can actually drain.
773            while actual_num_rows > 0 {
774                let num_items =
775                    self.offsets[(self.rows_drained + actual_num_rows) as usize] - item_start;
776                if num_items <= i32::MAX as u64 {
777                    break;
778                }
779                // TODO: This could be slow.  Maybe faster to start from zero or do binary search.  Investigate when
780                // actually adding support for smaller than requested batches
781                actual_num_rows -= 1;
782            }
783        }
784        if actual_num_rows < num_rows {
785            // TODO: We should be able to automatically
786            // shrink the read batch size if we detect the batches are going to be huge (maybe
787            // even achieve this with a read_batch_bytes parameter, though some estimation may
788            // still be required)
789            return Err(Error::not_supported_source(format!("loading a batch of {} lists would require creating an array with over i32::MAX items and we don't yet support returning smaller than requested batches", num_rows).into()));
790        }
791        let offsets = self.offsets
792            [self.rows_drained as usize..(self.rows_drained + actual_num_rows + 1) as usize]
793            .to_vec();
794        let validity = self
795            .validity
796            .slice(self.rows_drained as usize, actual_num_rows as usize);
797        let start = offsets[0];
798        let end = offsets[offsets.len() - 1];
799        let num_items_to_drain = end - start;
800
801        let item_decode = if num_items_to_drain == 0 {
802            None
803        } else {
804            self.item_decoder
805                .as_mut()
806                .map(|item_decoder| Result::Ok(item_decoder.drain(num_items_to_drain)?.task))
807                .transpose()?
808        };
809
810        self.rows_drained += num_rows;
811        Ok(NextDecodeTask {
812            num_rows,
813            task: Box::new(ListDecodeTask {
814                offsets,
815                validity,
816                items_field: self.items_field.clone(),
817                items: item_decode,
818                offset_type: self.offset_type.clone(),
819            }) as Box<dyn DecodeArrayTask>,
820        })
821    }
822
823    fn num_rows(&self) -> u64 {
824        self.num_rows
825    }
826
827    fn rows_loaded(&self) -> u64 {
828        self.rows_loaded
829    }
830
831    fn rows_drained(&self) -> u64 {
832        self.rows_drained
833    }
834
835    fn data_type(&self) -> &DataType {
836        &self.data_type
837    }
838}
839
840struct IndirectlyLoaded {
841    offsets: Arc<[u64]>,
842    validity: BooleanBuffer,
843    root_decoder: Option<SimpleStructDecoder>,
844}
845
846impl std::fmt::Debug for IndirectlyLoaded {
847    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
848        f.debug_struct("IndirectlyLoaded")
849            .field("offsets", &self.offsets)
850            .field("validity", &self.validity)
851            .finish()
852    }
853}
854
855/// An encoder for list offsets that "stitches" offsets and encodes nulls into the offsets
856///
857/// If we need to encode several list arrays into a single page then we need to "stitch" the offsets
858/// For example, imagine we have list arrays [[0, 1], [2]] and [[3, 4, 5]].
859///
860/// We will have offset arrays [0, 2, 3] and [0, 3].  We don't want to encode [0, 2, 3, 0, 3].  What
861/// we want is [0, 2, 3, 6]
862///
863/// This encoder also handles validity by converting a null value into an oversized offset.  For example,
864/// if we have four lists with offsets [0, 20, 20, 20, 30] and the list at index 2 is null (note that
865/// the list at index 1 is empty) then we turn this into offsets [0, 20, 20, 51, 30].  We replace a null
866/// offset with previous_offset + max_offset + 1.  This makes it possible to load a single item from the
867/// list array.
868///
869/// These offsets are always stored on disk as a u64 array.  First, this is because its simply much more
870/// likely than one expects that this is needed, even if our lists are not massive.  This is because we
871/// only write an offsets page when we have enough data.  This means we will probably accumulate a million
872/// offsets or more before we bother to write a page. If our lists have a few thousand items a piece then
873/// we end up passing the u32::MAX boundary.
874///
875/// The second reason is that list offsets are very easily compacted with delta + bit packing and so those
876/// u64 offsets should easily be shrunk down before being put on disk.
877///
878/// This encoder can encode both lists and large lists.  It can decode the resulting column into either type
879/// as well. (TODO: Test and enable large lists)
880///
881/// You can even write as a large list and decode as a regular list (as long as no single list has more than
882/// 2^31 items) or vice versa.  You could even encode a mixed stream of list and large list (but unclear that
883/// would ever be useful)
884#[derive(Debug)]
885struct ListOffsetsEncoder {
886    // An accumulation queue, we insert both offset arrays and validity arrays into this queue
887    accumulation_queue: AccumulationQueue,
888    // The inner encoder of offset values
889    inner_encoder: Arc<dyn ArrayEncoder>,
890    column_index: u32,
891}
892
893impl ListOffsetsEncoder {
894    fn new(
895        cache_bytes: u64,
896        keep_original_array: bool,
897        column_index: u32,
898        inner_encoder: Arc<dyn ArrayEncoder>,
899    ) -> Self {
900        Self {
901            accumulation_queue: AccumulationQueue::new(
902                cache_bytes,
903                column_index,
904                keep_original_array,
905            ),
906            inner_encoder,
907            column_index,
908        }
909    }
910
911    /// Given a list array, return the offsets as a standalone ArrayRef (either an Int32Array or Int64Array)
912    fn extract_offsets(list_arr: &dyn Array) -> ArrayRef {
913        match list_arr.data_type() {
914            DataType::List(_) => {
915                let offsets = list_arr.as_list::<i32>().offsets().clone();
916                Arc::new(Int32Array::new(offsets.into_inner(), None))
917            }
918            DataType::LargeList(_) => {
919                let offsets = list_arr.as_list::<i64>().offsets().clone();
920                Arc::new(Int64Array::new(offsets.into_inner(), None))
921            }
922            _ => panic!(),
923        }
924    }
925
926    /// Converts the validity of a list array into a boolean array.  If there is no validity information
927    /// then this is an empty boolean array.
928    fn extract_validity(list_arr: &dyn Array) -> ArrayRef {
929        if let Some(validity) = list_arr.nulls() {
930            Arc::new(BooleanArray::new(validity.inner().clone(), None))
931        } else {
932            // We convert None validity into an empty array because the accumulation queue can't
933            // handle Option<ArrayRef>
934            new_empty_array(&DataType::Boolean)
935        }
936    }
937
938    fn make_encode_task(&self, arrays: Vec<ArrayRef>) -> EncodeTask {
939        let inner_encoder = self.inner_encoder.clone();
940        let column_idx = self.column_index;
941        // At this point we should have 2*N arrays where the even-indexed arrays are integer offsets
942        // and the odd-indexed arrays are boolean validity bitmaps
943        let offset_arrays = arrays.iter().step_by(2).cloned().collect::<Vec<_>>();
944        let validity_arrays = arrays.into_iter().skip(1).step_by(2).collect::<Vec<_>>();
945
946        tokio::task::spawn(async move {
947            let num_rows =
948                offset_arrays.iter().map(|arr| arr.len()).sum::<usize>() - offset_arrays.len();
949            let num_rows = num_rows as u64;
950            let mut buffer_index = 0;
951            let array = Self::do_encode(
952                offset_arrays,
953                validity_arrays,
954                &mut buffer_index,
955                num_rows,
956                inner_encoder,
957            )?;
958            let (data, description) = array.into_buffers();
959            Ok(EncodedPage {
960                data,
961                description: PageEncoding::Legacy(description),
962                num_rows,
963                column_idx,
964                row_number: 0, // Legacy encoders do not use
965            })
966        })
967        .map(|res_res| res_res.unwrap())
968        .boxed()
969    }
970
971    fn maybe_encode_offsets_and_validity(&mut self, list_arr: &dyn Array) -> Option<EncodeTask> {
972        let offsets = Self::extract_offsets(list_arr);
973        let validity = Self::extract_validity(list_arr);
974        let num_rows = offsets.len() as u64;
975        // Either inserting the offsets OR inserting the validity could cause the
976        // accumulation queue to fill up
977        if let Some(mut arrays) = self
978            .accumulation_queue
979            .insert(offsets, /*row_number=*/ 0, num_rows)
980        {
981            arrays.0.push(validity);
982            Some(self.make_encode_task(arrays.0))
983        } else if let Some(arrays) = self
984            .accumulation_queue
985            .insert(validity, /*row_number=*/ 0, num_rows)
986        {
987            Some(self.make_encode_task(arrays.0))
988        } else {
989            None
990        }
991    }
992
993    fn flush(&mut self) -> Option<EncodeTask> {
994        if let Some(arrays) = self.accumulation_queue.flush() {
995            Some(self.make_encode_task(arrays.0))
996        } else {
997            None
998        }
999    }
1000
1001    // Get's the total number of items covered by an array of offsets (keeping in
1002    // mind that the first offset may not be zero)
1003    fn get_offset_span(array: &dyn Array) -> u64 {
1004        match array.data_type() {
1005            DataType::Int32 => {
1006                let arr_i32 = array.as_primitive::<Int32Type>();
1007                (arr_i32.value(arr_i32.len() - 1) - arr_i32.value(0)) as u64
1008            }
1009            DataType::Int64 => {
1010                let arr_i64 = array.as_primitive::<Int64Type>();
1011                (arr_i64.value(arr_i64.len() - 1) - arr_i64.value(0)) as u64
1012            }
1013            _ => panic!(),
1014        }
1015    }
1016
1017    // This is where we do the work to actually shift the offsets and encode nulls
1018    // Note that the output is u64 and the input could be i32 OR i64.
1019    fn extend_offsets_vec_u64(
1020        dest: &mut Vec<u64>,
1021        offsets: &dyn Array,
1022        validity: Option<&BooleanArray>,
1023        // The offset of this list into the destination
1024        base: u64,
1025        null_offset_adjustment: u64,
1026    ) {
1027        match offsets.data_type() {
1028            DataType::Int32 => {
1029                let offsets_i32 = offsets.as_primitive::<Int32Type>();
1030                let start = offsets_i32.value(0) as u64;
1031                // If we want to take a list from start..X and change it into
1032                // a list from end..X then we need to add (base - start) to all elements
1033                // Note that `modifier` may be negative but (item + modifier) will always be >= 0
1034                let modifier = base as i64 - start as i64;
1035                if let Some(validity) = validity {
1036                    dest.extend(
1037                        offsets_i32
1038                            .values()
1039                            .iter()
1040                            .skip(1)
1041                            .zip(validity.values().iter())
1042                            .map(|(&off, valid)| {
1043                                (off as i64 + modifier) as u64
1044                                    + (!valid as u64 * null_offset_adjustment)
1045                            }),
1046                    );
1047                } else {
1048                    dest.extend(
1049                        offsets_i32
1050                            .values()
1051                            .iter()
1052                            .skip(1)
1053                            // Subtract by `start` so offsets start at 0
1054                            .map(|&v| (v as i64 + modifier) as u64),
1055                    );
1056                }
1057            }
1058            DataType::Int64 => {
1059                let offsets_i64 = offsets.as_primitive::<Int64Type>();
1060                let start = offsets_i64.value(0) as u64;
1061                // If we want to take a list from start..X and change it into
1062                // a list from end..X then we need to add (base - start) to all elements
1063                // Note that `modifier` may be negative but (item + modifier) will always be >= 0
1064                let modifier = base as i64 - start as i64;
1065                if let Some(validity) = validity {
1066                    dest.extend(
1067                        offsets_i64
1068                            .values()
1069                            .iter()
1070                            .skip(1)
1071                            .zip(validity.values().iter())
1072                            .map(|(&off, valid)| {
1073                                (off + modifier) as u64 + (!valid as u64 * null_offset_adjustment)
1074                            }),
1075                    )
1076                } else {
1077                    dest.extend(
1078                        offsets_i64
1079                            .values()
1080                            .iter()
1081                            .skip(1)
1082                            .map(|&v| (v + modifier) as u64),
1083                    );
1084                }
1085            }
1086            _ => panic!("Invalid list offsets data type {:?}", offsets.data_type()),
1087        }
1088    }
1089
1090    fn do_encode_u64(
1091        offset_arrays: Vec<ArrayRef>,
1092        validity: Vec<Option<&BooleanArray>>,
1093        num_offsets: u64,
1094        null_offset_adjustment: u64,
1095        buffer_index: &mut u32,
1096        inner_encoder: Arc<dyn ArrayEncoder>,
1097    ) -> Result<EncodedArray> {
1098        let mut offsets = Vec::with_capacity(num_offsets as usize);
1099        for (offsets_arr, validity_arr) in offset_arrays.iter().zip(validity) {
1100            let last_prev_offset = offsets.last().copied().unwrap_or(0) % null_offset_adjustment;
1101            Self::extend_offsets_vec_u64(
1102                &mut offsets,
1103                &offsets_arr,
1104                validity_arr,
1105                last_prev_offset,
1106                null_offset_adjustment,
1107            );
1108        }
1109        let offsets_data = DataBlock::FixedWidth(FixedWidthDataBlock {
1110            bits_per_value: 64,
1111            data: LanceBuffer::reinterpret_vec(offsets),
1112            num_values: num_offsets,
1113            block_info: BlockInfo::new(),
1114        });
1115        inner_encoder.encode(offsets_data, &DataType::UInt64, buffer_index)
1116    }
1117
1118    fn do_encode(
1119        offset_arrays: Vec<ArrayRef>,
1120        validity_arrays: Vec<ArrayRef>,
1121        buffer_index: &mut u32,
1122        num_offsets: u64,
1123        inner_encoder: Arc<dyn ArrayEncoder>,
1124    ) -> Result<EncodedArray> {
1125        let validity_arrays = validity_arrays
1126            .iter()
1127            .map(|v| {
1128                if v.is_empty() {
1129                    None
1130                } else {
1131                    Some(v.as_boolean())
1132                }
1133            })
1134            .collect::<Vec<_>>();
1135        debug_assert_eq!(offset_arrays.len(), validity_arrays.len());
1136        let total_span = offset_arrays
1137            .iter()
1138            .map(|arr| Self::get_offset_span(arr.as_ref()))
1139            .sum::<u64>();
1140        // See encodings.proto for reasoning behind this value
1141        let null_offset_adjustment = total_span + 1;
1142        let encoded_offsets = Self::do_encode_u64(
1143            offset_arrays,
1144            validity_arrays,
1145            num_offsets,
1146            null_offset_adjustment,
1147            buffer_index,
1148            inner_encoder,
1149        )?;
1150        Ok(EncodedArray {
1151            data: encoded_offsets.data,
1152            encoding: pb::ArrayEncoding {
1153                array_encoding: Some(pb::array_encoding::ArrayEncoding::List(Box::new(
1154                    pb::List {
1155                        offsets: Some(Box::new(encoded_offsets.encoding)),
1156                        null_offset_adjustment,
1157                        num_items: total_span,
1158                    },
1159                ))),
1160            },
1161        })
1162    }
1163}
1164
1165pub struct ListFieldEncoder {
1166    offsets_encoder: ListOffsetsEncoder,
1167    items_encoder: Box<dyn FieldEncoder>,
1168}
1169
1170impl ListFieldEncoder {
1171    pub fn new(
1172        items_encoder: Box<dyn FieldEncoder>,
1173        inner_offsets_encoder: Arc<dyn ArrayEncoder>,
1174        cache_bytes_per_columns: u64,
1175        keep_original_array: bool,
1176        column_index: u32,
1177    ) -> Self {
1178        Self {
1179            offsets_encoder: ListOffsetsEncoder::new(
1180                cache_bytes_per_columns,
1181                keep_original_array,
1182                column_index,
1183                inner_offsets_encoder,
1184            ),
1185            items_encoder,
1186        }
1187    }
1188
1189    fn combine_tasks(
1190        offsets_tasks: Vec<EncodeTask>,
1191        item_tasks: Vec<EncodeTask>,
1192    ) -> Result<Vec<EncodeTask>> {
1193        let mut all_tasks = offsets_tasks;
1194        let item_tasks = item_tasks;
1195        all_tasks.extend(item_tasks);
1196        Ok(all_tasks)
1197    }
1198}
1199
1200impl FieldEncoder for ListFieldEncoder {
1201    fn maybe_encode(
1202        &mut self,
1203        array: ArrayRef,
1204        external_buffers: &mut OutOfLineBuffers,
1205        repdef: RepDefBuilder,
1206        row_number: u64,
1207        num_rows: u64,
1208    ) -> Result<Vec<EncodeTask>> {
1209        // The list may have an offset / shorter length which means the underlying
1210        // values array could be longer than what we need to encode and so we need
1211        // to slice down to the region of interest.
1212        let items = match array.data_type() {
1213            DataType::List(_) => {
1214                let list_arr = array.as_list::<i32>();
1215                let items_start = list_arr.value_offsets()[list_arr.offset()] as usize;
1216                let items_end =
1217                    list_arr.value_offsets()[list_arr.offset() + list_arr.len()] as usize;
1218                list_arr
1219                    .values()
1220                    .slice(items_start, items_end - items_start)
1221            }
1222            DataType::LargeList(_) => {
1223                let list_arr = array.as_list::<i64>();
1224                let items_start = list_arr.value_offsets()[list_arr.offset()] as usize;
1225                let items_end =
1226                    list_arr.value_offsets()[list_arr.offset() + list_arr.len()] as usize;
1227                list_arr
1228                    .values()
1229                    .slice(items_start, items_end - items_start)
1230            }
1231            _ => panic!(),
1232        };
1233        let offsets_tasks = self
1234            .offsets_encoder
1235            .maybe_encode_offsets_and_validity(array.as_ref())
1236            .map(|task| vec![task])
1237            .unwrap_or_default();
1238        let mut item_tasks = self.items_encoder.maybe_encode(
1239            items,
1240            external_buffers,
1241            repdef,
1242            row_number,
1243            num_rows,
1244        )?;
1245        if !offsets_tasks.is_empty() && item_tasks.is_empty() {
1246            // An items page cannot currently be shared by two different offsets pages.  This is
1247            // a limitation in the current scheduler and could be addressed in the future.  As a result
1248            // we always need to encode the items page if we encode the offsets page.
1249            //
1250            // In practice this isn't usually too bad unless we are targeting very small pages.
1251            item_tasks = self.items_encoder.flush(external_buffers)?;
1252        }
1253        Self::combine_tasks(offsets_tasks, item_tasks)
1254    }
1255
1256    fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
1257        let offsets_tasks = self
1258            .offsets_encoder
1259            .flush()
1260            .map(|task| vec![task])
1261            .unwrap_or_default();
1262        let item_tasks = self.items_encoder.flush(external_buffers)?;
1263        Self::combine_tasks(offsets_tasks, item_tasks)
1264    }
1265
1266    fn num_columns(&self) -> u32 {
1267        self.items_encoder.num_columns() + 1
1268    }
1269
1270    fn finish(
1271        &mut self,
1272        external_buffers: &mut OutOfLineBuffers,
1273    ) -> BoxFuture<'_, Result<Vec<EncodedColumn>>> {
1274        let inner_columns = self.items_encoder.finish(external_buffers);
1275        async move {
1276            let mut columns = vec![EncodedColumn::default()];
1277            let inner_columns = inner_columns.await?;
1278            columns.extend(inner_columns);
1279            Ok(columns)
1280        }
1281        .boxed()
1282    }
1283}