Skip to main content

lance_encoding/previous/encodings/logical/
struct.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    collections::{BinaryHeap, VecDeque},
6    ops::Range,
7    sync::Arc,
8};
9
10use crate::{
11    decoder::{
12        DecodeArrayTask, FilterExpression, MessageType, NextDecodeTask, PriorityRange,
13        ScheduledScanLine, SchedulerContext,
14    },
15    previous::decoder::{DecoderReady, FieldScheduler, LogicalPageDecoder, SchedulingJob},
16};
17use arrow_array::{ArrayRef, StructArray};
18use arrow_schema::{DataType, Field, Fields};
19use futures::{FutureExt, StreamExt, TryStreamExt, future::BoxFuture, stream::FuturesUnordered};
20use lance_core::{Error, Result};
21use log::trace;
22
23#[derive(Debug)]
24struct SchedulingJobWithStatus<'a> {
25    col_idx: u32,
26    col_name: &'a str,
27    job: Box<dyn SchedulingJob + 'a>,
28    rows_scheduled: u64,
29    rows_remaining: u64,
30}
31
32impl PartialEq for SchedulingJobWithStatus<'_> {
33    fn eq(&self, other: &Self) -> bool {
34        self.col_idx == other.col_idx
35    }
36}
37
38impl Eq for SchedulingJobWithStatus<'_> {}
39
40impl PartialOrd for SchedulingJobWithStatus<'_> {
41    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
42        Some(self.cmp(other))
43    }
44}
45
46impl Ord for SchedulingJobWithStatus<'_> {
47    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
48        // Note this is reversed to make it min-heap
49        other.rows_scheduled.cmp(&self.rows_scheduled)
50    }
51}
52
53#[derive(Debug)]
54struct EmptyStructDecodeTask {
55    num_rows: u64,
56}
57
58impl DecodeArrayTask for EmptyStructDecodeTask {
59    fn decode(self: Box<Self>) -> Result<(ArrayRef, u64)> {
60        // data_size is only tracked in the v2.1 structural decode path; the legacy
61        // v2.0 path does not need it so we return 0.
62        Ok((
63            Arc::new(StructArray::new_empty_fields(self.num_rows as usize, None)),
64            0,
65        ))
66    }
67}
68
69#[derive(Debug)]
70struct EmptyStructDecoder {
71    num_rows: u64,
72    rows_drained: u64,
73    data_type: DataType,
74}
75
76impl EmptyStructDecoder {
77    fn new(num_rows: u64) -> Self {
78        Self {
79            num_rows,
80            rows_drained: 0,
81            data_type: DataType::Struct(Fields::from(Vec::<Field>::default())),
82        }
83    }
84}
85
86impl LogicalPageDecoder for EmptyStructDecoder {
87    fn wait_for_loaded(&mut self, _loaded_need: u64) -> BoxFuture<'_, Result<()>> {
88        Box::pin(std::future::ready(Ok(())))
89    }
90    fn rows_loaded(&self) -> u64 {
91        self.num_rows
92    }
93    fn rows_unloaded(&self) -> u64 {
94        0
95    }
96    fn num_rows(&self) -> u64 {
97        self.num_rows
98    }
99    fn rows_drained(&self) -> u64 {
100        self.rows_drained
101    }
102    fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
103        self.rows_drained += num_rows;
104        Ok(NextDecodeTask {
105            num_rows,
106            task: Box::new(EmptyStructDecodeTask { num_rows }),
107        })
108    }
109    fn data_type(&self) -> &DataType {
110        &self.data_type
111    }
112}
113
114#[derive(Debug)]
115struct EmptyStructSchedulerJob {
116    num_rows: u64,
117}
118
119impl SchedulingJob for EmptyStructSchedulerJob {
120    fn schedule_next(
121        &mut self,
122        context: &mut SchedulerContext,
123        _priority: &dyn PriorityRange,
124    ) -> Result<ScheduledScanLine> {
125        let empty_decoder = Box::new(EmptyStructDecoder::new(self.num_rows));
126        #[allow(deprecated)]
127        let struct_decoder = context.locate_decoder(empty_decoder);
128        Ok(ScheduledScanLine {
129            decoders: vec![MessageType::DecoderReady(struct_decoder)],
130            rows_scheduled: self.num_rows,
131        })
132    }
133
134    fn num_rows(&self) -> u64 {
135        self.num_rows
136    }
137}
138
139/// Scheduling job for struct data
140///
141/// The order in which we schedule the children is important.  We want to schedule the child
142/// with the least amount of data first.
143///
144/// This allows us to decode entire rows as quickly as possible
145#[derive(Debug)]
146struct SimpleStructSchedulerJob<'a> {
147    scheduler: &'a SimpleStructScheduler,
148    /// A min-heap whose key is the # of rows currently scheduled
149    children: BinaryHeap<SchedulingJobWithStatus<'a>>,
150    rows_scheduled: u64,
151    num_rows: u64,
152    initialized: bool,
153}
154
155impl<'a> SimpleStructSchedulerJob<'a> {
156    fn new(
157        scheduler: &'a SimpleStructScheduler,
158        children: Vec<Box<dyn SchedulingJob + 'a>>,
159        num_rows: u64,
160    ) -> Self {
161        let children = children
162            .into_iter()
163            .enumerate()
164            .map(|(idx, job)| SchedulingJobWithStatus {
165                col_idx: idx as u32,
166                col_name: scheduler.child_fields[idx].name(),
167                job,
168                rows_scheduled: 0,
169                rows_remaining: num_rows,
170            })
171            .collect::<BinaryHeap<_>>();
172        Self {
173            scheduler,
174            children,
175            rows_scheduled: 0,
176            num_rows,
177            initialized: false,
178        }
179    }
180}
181
182impl SchedulingJob for SimpleStructSchedulerJob<'_> {
183    fn schedule_next(
184        &mut self,
185        mut context: &mut SchedulerContext,
186        priority: &dyn PriorityRange,
187    ) -> Result<ScheduledScanLine> {
188        let mut decoders = Vec::new();
189        if !self.initialized {
190            // Send info to the decoder thread so it knows a struct is here.  In the future we will also
191            // send validity info here.
192            let struct_decoder = Box::new(SimpleStructDecoder::new(
193                self.scheduler.child_fields.clone(),
194                self.num_rows,
195            ));
196            #[allow(deprecated)]
197            let struct_decoder = context.locate_decoder(struct_decoder);
198            decoders.push(MessageType::DecoderReady(struct_decoder));
199            self.initialized = true;
200        }
201        let old_rows_scheduled = self.rows_scheduled;
202        // Schedule as many children as we need to until we have scheduled at least one
203        // complete row
204        while old_rows_scheduled == self.rows_scheduled {
205            let mut next_child = self.children.pop().unwrap();
206            trace!("Scheduling more rows for child {}", next_child.col_idx);
207            let scoped = context.push(next_child.col_name, next_child.col_idx);
208            let child_scan = next_child.job.schedule_next(scoped.context, priority)?;
209            trace!(
210                "Scheduled {} rows for child {}",
211                child_scan.rows_scheduled, next_child.col_idx
212            );
213            next_child.rows_scheduled += child_scan.rows_scheduled;
214            next_child.rows_remaining -= child_scan.rows_scheduled;
215            decoders.extend(child_scan.decoders);
216            self.children.push(next_child);
217            self.rows_scheduled = self.children.peek().unwrap().rows_scheduled;
218            context = scoped.pop();
219        }
220        let struct_rows_scheduled = self.rows_scheduled - old_rows_scheduled;
221        Ok(ScheduledScanLine {
222            decoders,
223            rows_scheduled: struct_rows_scheduled,
224        })
225    }
226
227    fn num_rows(&self) -> u64 {
228        self.num_rows
229    }
230}
231
232/// A scheduler for structs
233///
234/// The implementation is actually a bit more tricky than one might initially think.  We can't just
235/// go through and schedule each column one after the other.  This would mean our decode can't start
236/// until nearly all the data has arrived (since we need data from each column)
237///
238/// Instead, we schedule in row-major fashion
239///
240/// Note: this scheduler is the starting point for all decoding.  This is because we treat the top-level
241/// record batch as a non-nullable struct.
242#[derive(Debug)]
243pub struct SimpleStructScheduler {
244    children: Vec<Arc<dyn FieldScheduler>>,
245    child_fields: Fields,
246    num_rows: u64,
247}
248
249impl SimpleStructScheduler {
250    pub fn new(
251        children: Vec<Arc<dyn FieldScheduler>>,
252        child_fields: Fields,
253        num_rows: u64,
254    ) -> Self {
255        let num_rows = children
256            .first()
257            .map(|child| child.num_rows())
258            .unwrap_or(num_rows);
259        debug_assert!(children.iter().all(|child| child.num_rows() == num_rows));
260        Self {
261            children,
262            child_fields,
263            num_rows,
264        }
265    }
266}
267
268impl FieldScheduler for SimpleStructScheduler {
269    fn schedule_ranges<'a>(
270        &'a self,
271        ranges: &[Range<u64>],
272        filter: &FilterExpression,
273    ) -> Result<Box<dyn SchedulingJob + 'a>> {
274        if self.children.is_empty() {
275            return Ok(Box::new(EmptyStructSchedulerJob {
276                num_rows: ranges.iter().map(|r| r.end - r.start).sum(),
277            }));
278        }
279        let child_schedulers = self
280            .children
281            .iter()
282            .map(|child| child.schedule_ranges(ranges, filter))
283            .collect::<Result<Vec<_>>>()?;
284        let num_rows = child_schedulers[0].num_rows();
285        Ok(Box::new(SimpleStructSchedulerJob::new(
286            self,
287            child_schedulers,
288            num_rows,
289        )))
290    }
291
292    fn num_rows(&self) -> u64 {
293        self.num_rows
294    }
295
296    fn initialize<'a>(
297        &'a self,
298        _filter: &'a FilterExpression,
299        _context: &'a SchedulerContext,
300    ) -> BoxFuture<'a, Result<()>> {
301        let futures = self
302            .children
303            .iter()
304            .map(|child| child.initialize(_filter, _context))
305            .collect::<FuturesUnordered<_>>();
306        async move {
307            futures
308                .map(|res| res.map(|_| ()))
309                .try_collect::<Vec<_>>()
310                .await?;
311            Ok(())
312        }
313        .boxed()
314    }
315}
316
317#[derive(Debug)]
318struct ChildState {
319    // As child decoders are scheduled they are added to this queue
320    // Once the decoder is fully drained it is popped from this queue
321    //
322    // TODO: It may be a minor perf optimization, in some rare cases, if we have a separate
323    // "fully awaited but not yet drained" queue so we don't loop through fully awaited pages
324    // during each call to wait.
325    //
326    // Note: This queue may have more than one page in it if the batch size is very large
327    // or pages are very small
328    // TODO: Test this case
329    scheduled: VecDeque<Box<dyn LogicalPageDecoder>>,
330    // Rows that have been awaited
331    rows_loaded: u64,
332    // Rows that have drained
333    rows_drained: u64,
334    // Rows that have been popped (the decoder has been completely drained and removed from `scheduled`)
335    rows_popped: u64,
336    // Total number of rows in the struct
337    num_rows: u64,
338    // The field index in the struct (used for debugging / logging)
339    field_index: u32,
340}
341
342impl ChildState {
343    fn new(num_rows: u64, field_index: u32) -> Self {
344        Self {
345            scheduled: VecDeque::new(),
346            rows_loaded: 0,
347            rows_drained: 0,
348            rows_popped: 0,
349            num_rows,
350            field_index,
351        }
352    }
353
354    // Wait for the next set of rows to arrive
355    //
356    // Wait until we have at least `loaded_need` loaded and stop as soon as we
357    // go above that limit.
358    async fn wait_for_loaded(&mut self, loaded_need: u64) -> Result<()> {
359        trace!(
360            "Struct child {} waiting for more than {} rows to be loaded and {} are fully loaded already",
361            self.field_index, loaded_need, self.rows_loaded,
362        );
363        let mut fully_loaded = self.rows_popped;
364        for (page_idx, next_decoder) in self.scheduled.iter_mut().enumerate() {
365            if next_decoder.rows_unloaded() > 0 {
366                let mut current_need = loaded_need;
367                current_need -= fully_loaded;
368                let rows_in_page = next_decoder.num_rows();
369                let need_for_page = (rows_in_page - 1).min(current_need);
370                trace!(
371                    "Struct child {} page {} will wait until more than {} rows loaded from page with {} rows",
372                    self.field_index, page_idx, need_for_page, rows_in_page,
373                );
374                // We might only await part of a page.  This is important for things
375                // like the struct<struct<...>> case where we have one outer page, one
376                // middle page, and then a bunch of inner pages.  If we await the entire
377                // middle page then we will have to wait for all the inner pages to arrive
378                // before we can start decoding.
379                next_decoder.wait_for_loaded(need_for_page).await?;
380                let now_loaded = next_decoder.rows_loaded();
381                fully_loaded += now_loaded;
382                trace!(
383                    "Struct child {} page {} await and now has {} loaded rows and we have {} fully loaded",
384                    self.field_index, page_idx, now_loaded, fully_loaded
385                );
386            } else {
387                fully_loaded += next_decoder.num_rows();
388            }
389            if fully_loaded > loaded_need {
390                break;
391            }
392        }
393        self.rows_loaded = fully_loaded;
394        trace!(
395            "Struct child {} loaded {} new rows and now {} are loaded",
396            self.field_index, fully_loaded, self.rows_loaded
397        );
398        Ok(())
399    }
400
401    fn drain(&mut self, num_rows: u64) -> Result<CompositeDecodeTask> {
402        trace!("Struct draining {} rows", num_rows);
403
404        trace!(
405            "Draining {} rows from struct page with {} rows already drained",
406            num_rows, self.rows_drained
407        );
408        let mut remaining = num_rows;
409        let mut composite = CompositeDecodeTask {
410            tasks: Vec::new(),
411            num_rows: 0,
412            has_more: true,
413        };
414        while remaining > 0 {
415            let next = self.scheduled.front_mut().unwrap();
416            let rows_to_take = remaining.min(next.rows_left());
417            let next_task = next.drain(rows_to_take)?;
418            if next.rows_left() == 0 {
419                trace!("Completely drained page");
420                self.rows_popped += next.num_rows();
421                self.scheduled.pop_front();
422            }
423            remaining -= rows_to_take;
424            composite.tasks.push(next_task.task);
425            composite.num_rows += next_task.num_rows;
426        }
427        self.rows_drained += num_rows;
428        composite.has_more = self.rows_drained != self.num_rows;
429        Ok(composite)
430    }
431}
432
433// Wrapper around ChildState that orders using rows_unawaited
434struct WaitOrder<'a>(&'a mut ChildState);
435
436impl Eq for WaitOrder<'_> {}
437impl PartialEq for WaitOrder<'_> {
438    fn eq(&self, other: &Self) -> bool {
439        self.0.rows_loaded == other.0.rows_loaded
440    }
441}
442impl Ord for WaitOrder<'_> {
443    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
444        // Note: this is inverted so we have a min-heap
445        other.0.rows_loaded.cmp(&self.0.rows_loaded)
446    }
447}
448impl PartialOrd for WaitOrder<'_> {
449    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
450        Some(self.cmp(other))
451    }
452}
453
454#[derive(Debug)]
455pub struct SimpleStructDecoder {
456    children: Vec<ChildState>,
457    child_fields: Fields,
458    data_type: DataType,
459    num_rows: u64,
460}
461
462impl SimpleStructDecoder {
463    pub fn new(child_fields: Fields, num_rows: u64) -> Self {
464        let data_type = DataType::Struct(child_fields.clone());
465        Self {
466            children: child_fields
467                .iter()
468                .enumerate()
469                .map(|(idx, _)| ChildState::new(num_rows, idx as u32))
470                .collect(),
471            child_fields,
472            data_type,
473            num_rows,
474        }
475    }
476
477    async fn do_wait_for_loaded(&mut self, loaded_need: u64) -> Result<()> {
478        let mut wait_orders = self
479            .children
480            .iter_mut()
481            .filter_map(|child| {
482                if child.rows_loaded <= loaded_need {
483                    Some(WaitOrder(child))
484                } else {
485                    None
486                }
487            })
488            .collect::<BinaryHeap<_>>();
489        while !wait_orders.is_empty() {
490            let next_waiter = wait_orders.pop().unwrap();
491            let next_highest = wait_orders
492                .peek()
493                .map(|w| w.0.rows_loaded)
494                .unwrap_or(u64::MAX);
495            // Wait until you have the number of rows needed, or at least more than the
496            // next highest waiter
497            let limit = loaded_need.min(next_highest);
498            next_waiter.0.wait_for_loaded(limit).await?;
499            log::trace!(
500                "Struct child {} finished await pass and now {} are loaded",
501                next_waiter.0.field_index,
502                next_waiter.0.rows_loaded
503            );
504            if next_waiter.0.rows_loaded <= loaded_need {
505                wait_orders.push(next_waiter);
506            }
507        }
508        Ok(())
509    }
510}
511
512impl LogicalPageDecoder for SimpleStructDecoder {
513    fn accept_child(&mut self, mut child: DecoderReady) -> Result<()> {
514        // children with empty path should not be delivered to this method
515        let child_idx = child.path.pop_front().unwrap();
516        if child.path.is_empty() {
517            // This decoder is intended for us
518            self.children[child_idx as usize]
519                .scheduled
520                .push_back(child.decoder);
521        } else {
522            // This decoder is intended for one of our children
523            let intended = self.children[child_idx as usize].scheduled.back_mut().ok_or_else(|| Error::internal(format!("Decoder scheduled for child at index {} but we don't have any child at that index yet", child_idx)))?;
524            intended.accept_child(child)?;
525        }
526        Ok(())
527    }
528
529    fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<'_, Result<()>> {
530        self.do_wait_for_loaded(loaded_need).boxed()
531    }
532
533    fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
534        let child_tasks = self
535            .children
536            .iter_mut()
537            .map(|child| child.drain(num_rows))
538            .collect::<Result<Vec<_>>>()?;
539        let num_rows = child_tasks[0].num_rows;
540        debug_assert!(child_tasks.iter().all(|task| task.num_rows == num_rows));
541        Ok(NextDecodeTask {
542            task: Box::new(SimpleStructDecodeTask {
543                children: child_tasks,
544                child_fields: self.child_fields.clone(),
545            }),
546            num_rows,
547        })
548    }
549
550    fn rows_loaded(&self) -> u64 {
551        self.children.iter().map(|c| c.rows_loaded).min().unwrap()
552    }
553
554    fn rows_drained(&self) -> u64 {
555        // All children should have the same number of rows drained
556        debug_assert!(
557            self.children
558                .iter()
559                .all(|c| c.rows_drained == self.children[0].rows_drained)
560        );
561        self.children[0].rows_drained
562    }
563
564    fn num_rows(&self) -> u64 {
565        self.num_rows
566    }
567
568    fn data_type(&self) -> &DataType {
569        &self.data_type
570    }
571}
572
573struct CompositeDecodeTask {
574    // One per child
575    tasks: Vec<Box<dyn DecodeArrayTask>>,
576    num_rows: u64,
577    has_more: bool,
578}
579
580impl CompositeDecodeTask {
581    fn decode(self) -> Result<ArrayRef> {
582        let arrays = self
583            .tasks
584            .into_iter()
585            .map(|task| task.decode().map(|(arr, _)| arr))
586            .collect::<Result<Vec<_>>>()?;
587        let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
588        // TODO: If this is a primitive column we should be able to avoid this
589        // allocation + copy with "page bridging" which could save us a few CPU
590        // cycles.
591        //
592        // This optimization is probably most important for super fast storage like NVME
593        // where the page size can be smaller.
594        Ok(arrow_select::concat::concat(&array_refs)?)
595    }
596}
597
598struct SimpleStructDecodeTask {
599    children: Vec<CompositeDecodeTask>,
600    child_fields: Fields,
601}
602
603impl DecodeArrayTask for SimpleStructDecodeTask {
604    fn decode(self: Box<Self>) -> Result<(ArrayRef, u64)> {
605        let child_arrays = self
606            .children
607            .into_iter()
608            .map(|child| child.decode())
609            .collect::<Result<Vec<_>>>()?;
610        // data_size is only tracked in the v2.1 structural decode path; the legacy
611        // v2.0 path does not need it so we return 0.
612        Ok((
613            Arc::new(StructArray::try_new(self.child_fields, child_arrays, None)?),
614            0,
615        ))
616    }
617}