lance_encoding/previous/encodings/logical/
struct.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    collections::{BinaryHeap, VecDeque},
6    ops::Range,
7    sync::Arc,
8};
9
10use crate::{
11    decoder::{
12        DecodeArrayTask, FilterExpression, MessageType, NextDecodeTask, PriorityRange,
13        ScheduledScanLine, SchedulerContext,
14    },
15    previous::decoder::{DecoderReady, FieldScheduler, LogicalPageDecoder, SchedulingJob},
16};
17use arrow_array::{ArrayRef, StructArray};
18use arrow_schema::{DataType, Field, Fields};
19use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, StreamExt, TryStreamExt};
20use lance_core::{Error, Result};
21use log::trace;
22use snafu::location;
23
24#[derive(Debug)]
25struct SchedulingJobWithStatus<'a> {
26    col_idx: u32,
27    col_name: &'a str,
28    job: Box<dyn SchedulingJob + 'a>,
29    rows_scheduled: u64,
30    rows_remaining: u64,
31}
32
33impl PartialEq for SchedulingJobWithStatus<'_> {
34    fn eq(&self, other: &Self) -> bool {
35        self.col_idx == other.col_idx
36    }
37}
38
39impl Eq for SchedulingJobWithStatus<'_> {}
40
41impl PartialOrd for SchedulingJobWithStatus<'_> {
42    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
43        Some(self.cmp(other))
44    }
45}
46
47impl Ord for SchedulingJobWithStatus<'_> {
48    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
49        // Note this is reversed to make it min-heap
50        other.rows_scheduled.cmp(&self.rows_scheduled)
51    }
52}
53
54#[derive(Debug)]
55struct EmptyStructDecodeTask {
56    num_rows: u64,
57}
58
59impl DecodeArrayTask for EmptyStructDecodeTask {
60    fn decode(self: Box<Self>) -> Result<ArrayRef> {
61        Ok(Arc::new(StructArray::new_empty_fields(
62            self.num_rows as usize,
63            None,
64        )))
65    }
66}
67
68#[derive(Debug)]
69struct EmptyStructDecoder {
70    num_rows: u64,
71    rows_drained: u64,
72    data_type: DataType,
73}
74
75impl EmptyStructDecoder {
76    fn new(num_rows: u64) -> Self {
77        Self {
78            num_rows,
79            rows_drained: 0,
80            data_type: DataType::Struct(Fields::from(Vec::<Field>::default())),
81        }
82    }
83}
84
85impl LogicalPageDecoder for EmptyStructDecoder {
86    fn wait_for_loaded(&mut self, _loaded_need: u64) -> BoxFuture<'_, Result<()>> {
87        Box::pin(std::future::ready(Ok(())))
88    }
89    fn rows_loaded(&self) -> u64 {
90        self.num_rows
91    }
92    fn rows_unloaded(&self) -> u64 {
93        0
94    }
95    fn num_rows(&self) -> u64 {
96        self.num_rows
97    }
98    fn rows_drained(&self) -> u64 {
99        self.rows_drained
100    }
101    fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
102        self.rows_drained += num_rows;
103        Ok(NextDecodeTask {
104            num_rows,
105            task: Box::new(EmptyStructDecodeTask { num_rows }),
106        })
107    }
108    fn data_type(&self) -> &DataType {
109        &self.data_type
110    }
111}
112
113#[derive(Debug)]
114struct EmptyStructSchedulerJob {
115    num_rows: u64,
116}
117
118impl SchedulingJob for EmptyStructSchedulerJob {
119    fn schedule_next(
120        &mut self,
121        context: &mut SchedulerContext,
122        _priority: &dyn PriorityRange,
123    ) -> Result<ScheduledScanLine> {
124        let empty_decoder = Box::new(EmptyStructDecoder::new(self.num_rows));
125        #[allow(deprecated)]
126        let struct_decoder = context.locate_decoder(empty_decoder);
127        Ok(ScheduledScanLine {
128            decoders: vec![MessageType::DecoderReady(struct_decoder)],
129            rows_scheduled: self.num_rows,
130        })
131    }
132
133    fn num_rows(&self) -> u64 {
134        self.num_rows
135    }
136}
137
138/// Scheduling job for struct data
139///
140/// The order in which we schedule the children is important.  We want to schedule the child
141/// with the least amount of data first.
142///
143/// This allows us to decode entire rows as quickly as possible
144#[derive(Debug)]
145struct SimpleStructSchedulerJob<'a> {
146    scheduler: &'a SimpleStructScheduler,
147    /// A min-heap whose key is the # of rows currently scheduled
148    children: BinaryHeap<SchedulingJobWithStatus<'a>>,
149    rows_scheduled: u64,
150    num_rows: u64,
151    initialized: bool,
152}
153
154impl<'a> SimpleStructSchedulerJob<'a> {
155    fn new(
156        scheduler: &'a SimpleStructScheduler,
157        children: Vec<Box<dyn SchedulingJob + 'a>>,
158        num_rows: u64,
159    ) -> Self {
160        let children = children
161            .into_iter()
162            .enumerate()
163            .map(|(idx, job)| SchedulingJobWithStatus {
164                col_idx: idx as u32,
165                col_name: scheduler.child_fields[idx].name(),
166                job,
167                rows_scheduled: 0,
168                rows_remaining: num_rows,
169            })
170            .collect::<BinaryHeap<_>>();
171        Self {
172            scheduler,
173            children,
174            rows_scheduled: 0,
175            num_rows,
176            initialized: false,
177        }
178    }
179}
180
181impl SchedulingJob for SimpleStructSchedulerJob<'_> {
182    fn schedule_next(
183        &mut self,
184        mut context: &mut SchedulerContext,
185        priority: &dyn PriorityRange,
186    ) -> Result<ScheduledScanLine> {
187        let mut decoders = Vec::new();
188        if !self.initialized {
189            // Send info to the decoder thread so it knows a struct is here.  In the future we will also
190            // send validity info here.
191            let struct_decoder = Box::new(SimpleStructDecoder::new(
192                self.scheduler.child_fields.clone(),
193                self.num_rows,
194            ));
195            #[allow(deprecated)]
196            let struct_decoder = context.locate_decoder(struct_decoder);
197            decoders.push(MessageType::DecoderReady(struct_decoder));
198            self.initialized = true;
199        }
200        let old_rows_scheduled = self.rows_scheduled;
201        // Schedule as many children as we need to until we have scheduled at least one
202        // complete row
203        while old_rows_scheduled == self.rows_scheduled {
204            let mut next_child = self.children.pop().unwrap();
205            trace!("Scheduling more rows for child {}", next_child.col_idx);
206            let scoped = context.push(next_child.col_name, next_child.col_idx);
207            let child_scan = next_child.job.schedule_next(scoped.context, priority)?;
208            trace!(
209                "Scheduled {} rows for child {}",
210                child_scan.rows_scheduled,
211                next_child.col_idx
212            );
213            next_child.rows_scheduled += child_scan.rows_scheduled;
214            next_child.rows_remaining -= child_scan.rows_scheduled;
215            decoders.extend(child_scan.decoders);
216            self.children.push(next_child);
217            self.rows_scheduled = self.children.peek().unwrap().rows_scheduled;
218            context = scoped.pop();
219        }
220        let struct_rows_scheduled = self.rows_scheduled - old_rows_scheduled;
221        Ok(ScheduledScanLine {
222            decoders,
223            rows_scheduled: struct_rows_scheduled,
224        })
225    }
226
227    fn num_rows(&self) -> u64 {
228        self.num_rows
229    }
230}
231
232/// A scheduler for structs
233///
234/// The implementation is actually a bit more tricky than one might initially think.  We can't just
235/// go through and schedule each column one after the other.  This would mean our decode can't start
236/// until nearly all the data has arrived (since we need data from each column)
237///
238/// Instead, we schedule in row-major fashion
239///
240/// Note: this scheduler is the starting point for all decoding.  This is because we treat the top-level
241/// record batch as a non-nullable struct.
242#[derive(Debug)]
243pub struct SimpleStructScheduler {
244    children: Vec<Arc<dyn FieldScheduler>>,
245    child_fields: Fields,
246    num_rows: u64,
247}
248
249impl SimpleStructScheduler {
250    pub fn new(
251        children: Vec<Arc<dyn FieldScheduler>>,
252        child_fields: Fields,
253        num_rows: u64,
254    ) -> Self {
255        let num_rows = children
256            .first()
257            .map(|child| child.num_rows())
258            .unwrap_or(num_rows);
259        debug_assert!(children.iter().all(|child| child.num_rows() == num_rows));
260        Self {
261            children,
262            child_fields,
263            num_rows,
264        }
265    }
266}
267
268impl FieldScheduler for SimpleStructScheduler {
269    fn schedule_ranges<'a>(
270        &'a self,
271        ranges: &[Range<u64>],
272        filter: &FilterExpression,
273    ) -> Result<Box<dyn SchedulingJob + 'a>> {
274        if self.children.is_empty() {
275            return Ok(Box::new(EmptyStructSchedulerJob {
276                num_rows: ranges.iter().map(|r| r.end - r.start).sum(),
277            }));
278        }
279        let child_schedulers = self
280            .children
281            .iter()
282            .map(|child| child.schedule_ranges(ranges, filter))
283            .collect::<Result<Vec<_>>>()?;
284        let num_rows = child_schedulers[0].num_rows();
285        Ok(Box::new(SimpleStructSchedulerJob::new(
286            self,
287            child_schedulers,
288            num_rows,
289        )))
290    }
291
292    fn num_rows(&self) -> u64 {
293        self.num_rows
294    }
295
296    fn initialize<'a>(
297        &'a self,
298        _filter: &'a FilterExpression,
299        _context: &'a SchedulerContext,
300    ) -> BoxFuture<'a, Result<()>> {
301        let futures = self
302            .children
303            .iter()
304            .map(|child| child.initialize(_filter, _context))
305            .collect::<FuturesUnordered<_>>();
306        async move {
307            futures
308                .map(|res| res.map(|_| ()))
309                .try_collect::<Vec<_>>()
310                .await?;
311            Ok(())
312        }
313        .boxed()
314    }
315}
316
317#[derive(Debug)]
318struct ChildState {
319    // As child decoders are scheduled they are added to this queue
320    // Once the decoder is fully drained it is popped from this queue
321    //
322    // TODO: It may be a minor perf optimization, in some rare cases, if we have a separate
323    // "fully awaited but not yet drained" queue so we don't loop through fully awaited pages
324    // during each call to wait.
325    //
326    // Note: This queue may have more than one page in it if the batch size is very large
327    // or pages are very small
328    // TODO: Test this case
329    scheduled: VecDeque<Box<dyn LogicalPageDecoder>>,
330    // Rows that have been awaited
331    rows_loaded: u64,
332    // Rows that have drained
333    rows_drained: u64,
334    // Rows that have been popped (the decoder has been completely drained and removed from `scheduled`)
335    rows_popped: u64,
336    // Total number of rows in the struct
337    num_rows: u64,
338    // The field index in the struct (used for debugging / logging)
339    field_index: u32,
340}
341
342impl ChildState {
343    fn new(num_rows: u64, field_index: u32) -> Self {
344        Self {
345            scheduled: VecDeque::new(),
346            rows_loaded: 0,
347            rows_drained: 0,
348            rows_popped: 0,
349            num_rows,
350            field_index,
351        }
352    }
353
354    // Wait for the next set of rows to arrive
355    //
356    // Wait until we have at least `loaded_need` loaded and stop as soon as we
357    // go above that limit.
358    async fn wait_for_loaded(&mut self, loaded_need: u64) -> Result<()> {
359        trace!(
360            "Struct child {} waiting for more than {} rows to be loaded and {} are fully loaded already",
361            self.field_index,
362            loaded_need,
363            self.rows_loaded,
364        );
365        let mut fully_loaded = self.rows_popped;
366        for (page_idx, next_decoder) in self.scheduled.iter_mut().enumerate() {
367            if next_decoder.rows_unloaded() > 0 {
368                let mut current_need = loaded_need;
369                current_need -= fully_loaded;
370                let rows_in_page = next_decoder.num_rows();
371                let need_for_page = (rows_in_page - 1).min(current_need);
372                trace!(
373                    "Struct child {} page {} will wait until more than {} rows loaded from page with {} rows",
374                    self.field_index,
375                    page_idx,
376                    need_for_page,
377                    rows_in_page,
378                );
379                // We might only await part of a page.  This is important for things
380                // like the struct<struct<...>> case where we have one outer page, one
381                // middle page, and then a bunch of inner pages.  If we await the entire
382                // middle page then we will have to wait for all the inner pages to arrive
383                // before we can start decoding.
384                next_decoder.wait_for_loaded(need_for_page).await?;
385                let now_loaded = next_decoder.rows_loaded();
386                fully_loaded += now_loaded;
387                trace!(
388                    "Struct child {} page {} await and now has {} loaded rows and we have {} fully loaded",
389                    self.field_index,
390                    page_idx,
391                    now_loaded,
392                    fully_loaded
393                );
394            } else {
395                fully_loaded += next_decoder.num_rows();
396            }
397            if fully_loaded > loaded_need {
398                break;
399            }
400        }
401        self.rows_loaded = fully_loaded;
402        trace!(
403            "Struct child {} loaded {} new rows and now {} are loaded",
404            self.field_index,
405            fully_loaded,
406            self.rows_loaded
407        );
408        Ok(())
409    }
410
411    fn drain(&mut self, num_rows: u64) -> Result<CompositeDecodeTask> {
412        trace!("Struct draining {} rows", num_rows);
413
414        trace!(
415            "Draining {} rows from struct page with {} rows already drained",
416            num_rows,
417            self.rows_drained
418        );
419        let mut remaining = num_rows;
420        let mut composite = CompositeDecodeTask {
421            tasks: Vec::new(),
422            num_rows: 0,
423            has_more: true,
424        };
425        while remaining > 0 {
426            let next = self.scheduled.front_mut().unwrap();
427            let rows_to_take = remaining.min(next.rows_left());
428            let next_task = next.drain(rows_to_take)?;
429            if next.rows_left() == 0 {
430                trace!("Completely drained page");
431                self.rows_popped += next.num_rows();
432                self.scheduled.pop_front();
433            }
434            remaining -= rows_to_take;
435            composite.tasks.push(next_task.task);
436            composite.num_rows += next_task.num_rows;
437        }
438        self.rows_drained += num_rows;
439        composite.has_more = self.rows_drained != self.num_rows;
440        Ok(composite)
441    }
442}
443
444// Wrapper around ChildState that orders using rows_unawaited
445struct WaitOrder<'a>(&'a mut ChildState);
446
447impl Eq for WaitOrder<'_> {}
448impl PartialEq for WaitOrder<'_> {
449    fn eq(&self, other: &Self) -> bool {
450        self.0.rows_loaded == other.0.rows_loaded
451    }
452}
453impl Ord for WaitOrder<'_> {
454    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
455        // Note: this is inverted so we have a min-heap
456        other.0.rows_loaded.cmp(&self.0.rows_loaded)
457    }
458}
459impl PartialOrd for WaitOrder<'_> {
460    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
461        Some(self.cmp(other))
462    }
463}
464
465#[derive(Debug)]
466pub struct SimpleStructDecoder {
467    children: Vec<ChildState>,
468    child_fields: Fields,
469    data_type: DataType,
470    num_rows: u64,
471}
472
473impl SimpleStructDecoder {
474    pub fn new(child_fields: Fields, num_rows: u64) -> Self {
475        let data_type = DataType::Struct(child_fields.clone());
476        Self {
477            children: child_fields
478                .iter()
479                .enumerate()
480                .map(|(idx, _)| ChildState::new(num_rows, idx as u32))
481                .collect(),
482            child_fields,
483            data_type,
484            num_rows,
485        }
486    }
487
488    async fn do_wait_for_loaded(&mut self, loaded_need: u64) -> Result<()> {
489        let mut wait_orders = self
490            .children
491            .iter_mut()
492            .filter_map(|child| {
493                if child.rows_loaded <= loaded_need {
494                    Some(WaitOrder(child))
495                } else {
496                    None
497                }
498            })
499            .collect::<BinaryHeap<_>>();
500        while !wait_orders.is_empty() {
501            let next_waiter = wait_orders.pop().unwrap();
502            let next_highest = wait_orders
503                .peek()
504                .map(|w| w.0.rows_loaded)
505                .unwrap_or(u64::MAX);
506            // Wait until you have the number of rows needed, or at least more than the
507            // next highest waiter
508            let limit = loaded_need.min(next_highest);
509            next_waiter.0.wait_for_loaded(limit).await?;
510            log::trace!(
511                "Struct child {} finished await pass and now {} are loaded",
512                next_waiter.0.field_index,
513                next_waiter.0.rows_loaded
514            );
515            if next_waiter.0.rows_loaded <= loaded_need {
516                wait_orders.push(next_waiter);
517            }
518        }
519        Ok(())
520    }
521}
522
523impl LogicalPageDecoder for SimpleStructDecoder {
524    fn accept_child(&mut self, mut child: DecoderReady) -> Result<()> {
525        // children with empty path should not be delivered to this method
526        let child_idx = child.path.pop_front().unwrap();
527        if child.path.is_empty() {
528            // This decoder is intended for us
529            self.children[child_idx as usize]
530                .scheduled
531                .push_back(child.decoder);
532        } else {
533            // This decoder is intended for one of our children
534            let intended = self.children[child_idx as usize].scheduled.back_mut().ok_or_else(|| Error::Internal { message: format!("Decoder scheduled for child at index {} but we don't have any child at that index yet", child_idx), location: location!() })?;
535            intended.accept_child(child)?;
536        }
537        Ok(())
538    }
539
540    fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<'_, Result<()>> {
541        self.do_wait_for_loaded(loaded_need).boxed()
542    }
543
544    fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
545        let child_tasks = self
546            .children
547            .iter_mut()
548            .map(|child| child.drain(num_rows))
549            .collect::<Result<Vec<_>>>()?;
550        let num_rows = child_tasks[0].num_rows;
551        debug_assert!(child_tasks.iter().all(|task| task.num_rows == num_rows));
552        Ok(NextDecodeTask {
553            task: Box::new(SimpleStructDecodeTask {
554                children: child_tasks,
555                child_fields: self.child_fields.clone(),
556            }),
557            num_rows,
558        })
559    }
560
561    fn rows_loaded(&self) -> u64 {
562        self.children.iter().map(|c| c.rows_loaded).min().unwrap()
563    }
564
565    fn rows_drained(&self) -> u64 {
566        // All children should have the same number of rows drained
567        debug_assert!(self
568            .children
569            .iter()
570            .all(|c| c.rows_drained == self.children[0].rows_drained));
571        self.children[0].rows_drained
572    }
573
574    fn num_rows(&self) -> u64 {
575        self.num_rows
576    }
577
578    fn data_type(&self) -> &DataType {
579        &self.data_type
580    }
581}
582
583struct CompositeDecodeTask {
584    // One per child
585    tasks: Vec<Box<dyn DecodeArrayTask>>,
586    num_rows: u64,
587    has_more: bool,
588}
589
590impl CompositeDecodeTask {
591    fn decode(self) -> Result<ArrayRef> {
592        let arrays = self
593            .tasks
594            .into_iter()
595            .map(|task| task.decode())
596            .collect::<Result<Vec<_>>>()?;
597        let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
598        // TODO: If this is a primitive column we should be able to avoid this
599        // allocation + copy with "page bridging" which could save us a few CPU
600        // cycles.
601        //
602        // This optimization is probably most important for super fast storage like NVME
603        // where the page size can be smaller.
604        Ok(arrow_select::concat::concat(&array_refs)?)
605    }
606}
607
608struct SimpleStructDecodeTask {
609    children: Vec<CompositeDecodeTask>,
610    child_fields: Fields,
611}
612
613impl DecodeArrayTask for SimpleStructDecodeTask {
614    fn decode(self: Box<Self>) -> Result<ArrayRef> {
615        let child_arrays = self
616            .children
617            .into_iter()
618            .map(|child| child.decode())
619            .collect::<Result<Vec<_>>>()?;
620        Ok(Arc::new(StructArray::try_new(
621            self.child_fields,
622            child_arrays,
623            None,
624        )?))
625    }
626}