Skip to main content

lance_encoding/previous/encodings/logical/
struct.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    collections::{BinaryHeap, VecDeque},
6    ops::Range,
7    sync::Arc,
8};
9
10use crate::{
11    decoder::{
12        DecodeArrayTask, FilterExpression, MessageType, NextDecodeTask, PriorityRange,
13        ScheduledScanLine, SchedulerContext,
14    },
15    previous::decoder::{DecoderReady, FieldScheduler, LogicalPageDecoder, SchedulingJob},
16};
17use arrow_array::{ArrayRef, StructArray};
18use arrow_schema::{DataType, Field, Fields};
19use futures::{FutureExt, StreamExt, TryStreamExt, future::BoxFuture, stream::FuturesUnordered};
20use lance_core::{Error, Result};
21use log::trace;
22
23#[derive(Debug)]
24struct SchedulingJobWithStatus<'a> {
25    col_idx: u32,
26    col_name: &'a str,
27    job: Box<dyn SchedulingJob + 'a>,
28    rows_scheduled: u64,
29    rows_remaining: u64,
30}
31
32impl PartialEq for SchedulingJobWithStatus<'_> {
33    fn eq(&self, other: &Self) -> bool {
34        self.col_idx == other.col_idx
35    }
36}
37
38impl Eq for SchedulingJobWithStatus<'_> {}
39
40impl PartialOrd for SchedulingJobWithStatus<'_> {
41    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
42        Some(self.cmp(other))
43    }
44}
45
46impl Ord for SchedulingJobWithStatus<'_> {
47    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
48        // Note this is reversed to make it min-heap
49        other.rows_scheduled.cmp(&self.rows_scheduled)
50    }
51}
52
53#[derive(Debug)]
54struct EmptyStructDecodeTask {
55    num_rows: u64,
56}
57
58impl DecodeArrayTask for EmptyStructDecodeTask {
59    fn decode(self: Box<Self>) -> Result<ArrayRef> {
60        Ok(Arc::new(StructArray::new_empty_fields(
61            self.num_rows as usize,
62            None,
63        )))
64    }
65}
66
67#[derive(Debug)]
68struct EmptyStructDecoder {
69    num_rows: u64,
70    rows_drained: u64,
71    data_type: DataType,
72}
73
74impl EmptyStructDecoder {
75    fn new(num_rows: u64) -> Self {
76        Self {
77            num_rows,
78            rows_drained: 0,
79            data_type: DataType::Struct(Fields::from(Vec::<Field>::default())),
80        }
81    }
82}
83
84impl LogicalPageDecoder for EmptyStructDecoder {
85    fn wait_for_loaded(&mut self, _loaded_need: u64) -> BoxFuture<'_, Result<()>> {
86        Box::pin(std::future::ready(Ok(())))
87    }
88    fn rows_loaded(&self) -> u64 {
89        self.num_rows
90    }
91    fn rows_unloaded(&self) -> u64 {
92        0
93    }
94    fn num_rows(&self) -> u64 {
95        self.num_rows
96    }
97    fn rows_drained(&self) -> u64 {
98        self.rows_drained
99    }
100    fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
101        self.rows_drained += num_rows;
102        Ok(NextDecodeTask {
103            num_rows,
104            task: Box::new(EmptyStructDecodeTask { num_rows }),
105        })
106    }
107    fn data_type(&self) -> &DataType {
108        &self.data_type
109    }
110}
111
112#[derive(Debug)]
113struct EmptyStructSchedulerJob {
114    num_rows: u64,
115}
116
117impl SchedulingJob for EmptyStructSchedulerJob {
118    fn schedule_next(
119        &mut self,
120        context: &mut SchedulerContext,
121        _priority: &dyn PriorityRange,
122    ) -> Result<ScheduledScanLine> {
123        let empty_decoder = Box::new(EmptyStructDecoder::new(self.num_rows));
124        #[allow(deprecated)]
125        let struct_decoder = context.locate_decoder(empty_decoder);
126        Ok(ScheduledScanLine {
127            decoders: vec![MessageType::DecoderReady(struct_decoder)],
128            rows_scheduled: self.num_rows,
129        })
130    }
131
132    fn num_rows(&self) -> u64 {
133        self.num_rows
134    }
135}
136
137/// Scheduling job for struct data
138///
139/// The order in which we schedule the children is important.  We want to schedule the child
140/// with the least amount of data first.
141///
142/// This allows us to decode entire rows as quickly as possible
143#[derive(Debug)]
144struct SimpleStructSchedulerJob<'a> {
145    scheduler: &'a SimpleStructScheduler,
146    /// A min-heap whose key is the # of rows currently scheduled
147    children: BinaryHeap<SchedulingJobWithStatus<'a>>,
148    rows_scheduled: u64,
149    num_rows: u64,
150    initialized: bool,
151}
152
153impl<'a> SimpleStructSchedulerJob<'a> {
154    fn new(
155        scheduler: &'a SimpleStructScheduler,
156        children: Vec<Box<dyn SchedulingJob + 'a>>,
157        num_rows: u64,
158    ) -> Self {
159        let children = children
160            .into_iter()
161            .enumerate()
162            .map(|(idx, job)| SchedulingJobWithStatus {
163                col_idx: idx as u32,
164                col_name: scheduler.child_fields[idx].name(),
165                job,
166                rows_scheduled: 0,
167                rows_remaining: num_rows,
168            })
169            .collect::<BinaryHeap<_>>();
170        Self {
171            scheduler,
172            children,
173            rows_scheduled: 0,
174            num_rows,
175            initialized: false,
176        }
177    }
178}
179
180impl SchedulingJob for SimpleStructSchedulerJob<'_> {
181    fn schedule_next(
182        &mut self,
183        mut context: &mut SchedulerContext,
184        priority: &dyn PriorityRange,
185    ) -> Result<ScheduledScanLine> {
186        let mut decoders = Vec::new();
187        if !self.initialized {
188            // Send info to the decoder thread so it knows a struct is here.  In the future we will also
189            // send validity info here.
190            let struct_decoder = Box::new(SimpleStructDecoder::new(
191                self.scheduler.child_fields.clone(),
192                self.num_rows,
193            ));
194            #[allow(deprecated)]
195            let struct_decoder = context.locate_decoder(struct_decoder);
196            decoders.push(MessageType::DecoderReady(struct_decoder));
197            self.initialized = true;
198        }
199        let old_rows_scheduled = self.rows_scheduled;
200        // Schedule as many children as we need to until we have scheduled at least one
201        // complete row
202        while old_rows_scheduled == self.rows_scheduled {
203            let mut next_child = self.children.pop().unwrap();
204            trace!("Scheduling more rows for child {}", next_child.col_idx);
205            let scoped = context.push(next_child.col_name, next_child.col_idx);
206            let child_scan = next_child.job.schedule_next(scoped.context, priority)?;
207            trace!(
208                "Scheduled {} rows for child {}",
209                child_scan.rows_scheduled, next_child.col_idx
210            );
211            next_child.rows_scheduled += child_scan.rows_scheduled;
212            next_child.rows_remaining -= child_scan.rows_scheduled;
213            decoders.extend(child_scan.decoders);
214            self.children.push(next_child);
215            self.rows_scheduled = self.children.peek().unwrap().rows_scheduled;
216            context = scoped.pop();
217        }
218        let struct_rows_scheduled = self.rows_scheduled - old_rows_scheduled;
219        Ok(ScheduledScanLine {
220            decoders,
221            rows_scheduled: struct_rows_scheduled,
222        })
223    }
224
225    fn num_rows(&self) -> u64 {
226        self.num_rows
227    }
228}
229
230/// A scheduler for structs
231///
232/// The implementation is actually a bit more tricky than one might initially think.  We can't just
233/// go through and schedule each column one after the other.  This would mean our decode can't start
234/// until nearly all the data has arrived (since we need data from each column)
235///
236/// Instead, we schedule in row-major fashion
237///
238/// Note: this scheduler is the starting point for all decoding.  This is because we treat the top-level
239/// record batch as a non-nullable struct.
240#[derive(Debug)]
241pub struct SimpleStructScheduler {
242    children: Vec<Arc<dyn FieldScheduler>>,
243    child_fields: Fields,
244    num_rows: u64,
245}
246
247impl SimpleStructScheduler {
248    pub fn new(
249        children: Vec<Arc<dyn FieldScheduler>>,
250        child_fields: Fields,
251        num_rows: u64,
252    ) -> Self {
253        let num_rows = children
254            .first()
255            .map(|child| child.num_rows())
256            .unwrap_or(num_rows);
257        debug_assert!(children.iter().all(|child| child.num_rows() == num_rows));
258        Self {
259            children,
260            child_fields,
261            num_rows,
262        }
263    }
264}
265
266impl FieldScheduler for SimpleStructScheduler {
267    fn schedule_ranges<'a>(
268        &'a self,
269        ranges: &[Range<u64>],
270        filter: &FilterExpression,
271    ) -> Result<Box<dyn SchedulingJob + 'a>> {
272        if self.children.is_empty() {
273            return Ok(Box::new(EmptyStructSchedulerJob {
274                num_rows: ranges.iter().map(|r| r.end - r.start).sum(),
275            }));
276        }
277        let child_schedulers = self
278            .children
279            .iter()
280            .map(|child| child.schedule_ranges(ranges, filter))
281            .collect::<Result<Vec<_>>>()?;
282        let num_rows = child_schedulers[0].num_rows();
283        Ok(Box::new(SimpleStructSchedulerJob::new(
284            self,
285            child_schedulers,
286            num_rows,
287        )))
288    }
289
290    fn num_rows(&self) -> u64 {
291        self.num_rows
292    }
293
294    fn initialize<'a>(
295        &'a self,
296        _filter: &'a FilterExpression,
297        _context: &'a SchedulerContext,
298    ) -> BoxFuture<'a, Result<()>> {
299        let futures = self
300            .children
301            .iter()
302            .map(|child| child.initialize(_filter, _context))
303            .collect::<FuturesUnordered<_>>();
304        async move {
305            futures
306                .map(|res| res.map(|_| ()))
307                .try_collect::<Vec<_>>()
308                .await?;
309            Ok(())
310        }
311        .boxed()
312    }
313}
314
315#[derive(Debug)]
316struct ChildState {
317    // As child decoders are scheduled they are added to this queue
318    // Once the decoder is fully drained it is popped from this queue
319    //
320    // TODO: It may be a minor perf optimization, in some rare cases, if we have a separate
321    // "fully awaited but not yet drained" queue so we don't loop through fully awaited pages
322    // during each call to wait.
323    //
324    // Note: This queue may have more than one page in it if the batch size is very large
325    // or pages are very small
326    // TODO: Test this case
327    scheduled: VecDeque<Box<dyn LogicalPageDecoder>>,
328    // Rows that have been awaited
329    rows_loaded: u64,
330    // Rows that have drained
331    rows_drained: u64,
332    // Rows that have been popped (the decoder has been completely drained and removed from `scheduled`)
333    rows_popped: u64,
334    // Total number of rows in the struct
335    num_rows: u64,
336    // The field index in the struct (used for debugging / logging)
337    field_index: u32,
338}
339
340impl ChildState {
341    fn new(num_rows: u64, field_index: u32) -> Self {
342        Self {
343            scheduled: VecDeque::new(),
344            rows_loaded: 0,
345            rows_drained: 0,
346            rows_popped: 0,
347            num_rows,
348            field_index,
349        }
350    }
351
352    // Wait for the next set of rows to arrive
353    //
354    // Wait until we have at least `loaded_need` loaded and stop as soon as we
355    // go above that limit.
356    async fn wait_for_loaded(&mut self, loaded_need: u64) -> Result<()> {
357        trace!(
358            "Struct child {} waiting for more than {} rows to be loaded and {} are fully loaded already",
359            self.field_index, loaded_need, self.rows_loaded,
360        );
361        let mut fully_loaded = self.rows_popped;
362        for (page_idx, next_decoder) in self.scheduled.iter_mut().enumerate() {
363            if next_decoder.rows_unloaded() > 0 {
364                let mut current_need = loaded_need;
365                current_need -= fully_loaded;
366                let rows_in_page = next_decoder.num_rows();
367                let need_for_page = (rows_in_page - 1).min(current_need);
368                trace!(
369                    "Struct child {} page {} will wait until more than {} rows loaded from page with {} rows",
370                    self.field_index, page_idx, need_for_page, rows_in_page,
371                );
372                // We might only await part of a page.  This is important for things
373                // like the struct<struct<...>> case where we have one outer page, one
374                // middle page, and then a bunch of inner pages.  If we await the entire
375                // middle page then we will have to wait for all the inner pages to arrive
376                // before we can start decoding.
377                next_decoder.wait_for_loaded(need_for_page).await?;
378                let now_loaded = next_decoder.rows_loaded();
379                fully_loaded += now_loaded;
380                trace!(
381                    "Struct child {} page {} await and now has {} loaded rows and we have {} fully loaded",
382                    self.field_index, page_idx, now_loaded, fully_loaded
383                );
384            } else {
385                fully_loaded += next_decoder.num_rows();
386            }
387            if fully_loaded > loaded_need {
388                break;
389            }
390        }
391        self.rows_loaded = fully_loaded;
392        trace!(
393            "Struct child {} loaded {} new rows and now {} are loaded",
394            self.field_index, fully_loaded, self.rows_loaded
395        );
396        Ok(())
397    }
398
399    fn drain(&mut self, num_rows: u64) -> Result<CompositeDecodeTask> {
400        trace!("Struct draining {} rows", num_rows);
401
402        trace!(
403            "Draining {} rows from struct page with {} rows already drained",
404            num_rows, self.rows_drained
405        );
406        let mut remaining = num_rows;
407        let mut composite = CompositeDecodeTask {
408            tasks: Vec::new(),
409            num_rows: 0,
410            has_more: true,
411        };
412        while remaining > 0 {
413            let next = self.scheduled.front_mut().unwrap();
414            let rows_to_take = remaining.min(next.rows_left());
415            let next_task = next.drain(rows_to_take)?;
416            if next.rows_left() == 0 {
417                trace!("Completely drained page");
418                self.rows_popped += next.num_rows();
419                self.scheduled.pop_front();
420            }
421            remaining -= rows_to_take;
422            composite.tasks.push(next_task.task);
423            composite.num_rows += next_task.num_rows;
424        }
425        self.rows_drained += num_rows;
426        composite.has_more = self.rows_drained != self.num_rows;
427        Ok(composite)
428    }
429}
430
431// Wrapper around ChildState that orders using rows_unawaited
432struct WaitOrder<'a>(&'a mut ChildState);
433
434impl Eq for WaitOrder<'_> {}
435impl PartialEq for WaitOrder<'_> {
436    fn eq(&self, other: &Self) -> bool {
437        self.0.rows_loaded == other.0.rows_loaded
438    }
439}
440impl Ord for WaitOrder<'_> {
441    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
442        // Note: this is inverted so we have a min-heap
443        other.0.rows_loaded.cmp(&self.0.rows_loaded)
444    }
445}
446impl PartialOrd for WaitOrder<'_> {
447    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
448        Some(self.cmp(other))
449    }
450}
451
452#[derive(Debug)]
453pub struct SimpleStructDecoder {
454    children: Vec<ChildState>,
455    child_fields: Fields,
456    data_type: DataType,
457    num_rows: u64,
458}
459
460impl SimpleStructDecoder {
461    pub fn new(child_fields: Fields, num_rows: u64) -> Self {
462        let data_type = DataType::Struct(child_fields.clone());
463        Self {
464            children: child_fields
465                .iter()
466                .enumerate()
467                .map(|(idx, _)| ChildState::new(num_rows, idx as u32))
468                .collect(),
469            child_fields,
470            data_type,
471            num_rows,
472        }
473    }
474
475    async fn do_wait_for_loaded(&mut self, loaded_need: u64) -> Result<()> {
476        let mut wait_orders = self
477            .children
478            .iter_mut()
479            .filter_map(|child| {
480                if child.rows_loaded <= loaded_need {
481                    Some(WaitOrder(child))
482                } else {
483                    None
484                }
485            })
486            .collect::<BinaryHeap<_>>();
487        while !wait_orders.is_empty() {
488            let next_waiter = wait_orders.pop().unwrap();
489            let next_highest = wait_orders
490                .peek()
491                .map(|w| w.0.rows_loaded)
492                .unwrap_or(u64::MAX);
493            // Wait until you have the number of rows needed, or at least more than the
494            // next highest waiter
495            let limit = loaded_need.min(next_highest);
496            next_waiter.0.wait_for_loaded(limit).await?;
497            log::trace!(
498                "Struct child {} finished await pass and now {} are loaded",
499                next_waiter.0.field_index,
500                next_waiter.0.rows_loaded
501            );
502            if next_waiter.0.rows_loaded <= loaded_need {
503                wait_orders.push(next_waiter);
504            }
505        }
506        Ok(())
507    }
508}
509
510impl LogicalPageDecoder for SimpleStructDecoder {
511    fn accept_child(&mut self, mut child: DecoderReady) -> Result<()> {
512        // children with empty path should not be delivered to this method
513        let child_idx = child.path.pop_front().unwrap();
514        if child.path.is_empty() {
515            // This decoder is intended for us
516            self.children[child_idx as usize]
517                .scheduled
518                .push_back(child.decoder);
519        } else {
520            // This decoder is intended for one of our children
521            let intended = self.children[child_idx as usize].scheduled.back_mut().ok_or_else(|| Error::internal(format!("Decoder scheduled for child at index {} but we don't have any child at that index yet", child_idx)))?;
522            intended.accept_child(child)?;
523        }
524        Ok(())
525    }
526
527    fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<'_, Result<()>> {
528        self.do_wait_for_loaded(loaded_need).boxed()
529    }
530
531    fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
532        let child_tasks = self
533            .children
534            .iter_mut()
535            .map(|child| child.drain(num_rows))
536            .collect::<Result<Vec<_>>>()?;
537        let num_rows = child_tasks[0].num_rows;
538        debug_assert!(child_tasks.iter().all(|task| task.num_rows == num_rows));
539        Ok(NextDecodeTask {
540            task: Box::new(SimpleStructDecodeTask {
541                children: child_tasks,
542                child_fields: self.child_fields.clone(),
543            }),
544            num_rows,
545        })
546    }
547
548    fn rows_loaded(&self) -> u64 {
549        self.children.iter().map(|c| c.rows_loaded).min().unwrap()
550    }
551
552    fn rows_drained(&self) -> u64 {
553        // All children should have the same number of rows drained
554        debug_assert!(
555            self.children
556                .iter()
557                .all(|c| c.rows_drained == self.children[0].rows_drained)
558        );
559        self.children[0].rows_drained
560    }
561
562    fn num_rows(&self) -> u64 {
563        self.num_rows
564    }
565
566    fn data_type(&self) -> &DataType {
567        &self.data_type
568    }
569}
570
571struct CompositeDecodeTask {
572    // One per child
573    tasks: Vec<Box<dyn DecodeArrayTask>>,
574    num_rows: u64,
575    has_more: bool,
576}
577
578impl CompositeDecodeTask {
579    fn decode(self) -> Result<ArrayRef> {
580        let arrays = self
581            .tasks
582            .into_iter()
583            .map(|task| task.decode())
584            .collect::<Result<Vec<_>>>()?;
585        let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
586        // TODO: If this is a primitive column we should be able to avoid this
587        // allocation + copy with "page bridging" which could save us a few CPU
588        // cycles.
589        //
590        // This optimization is probably most important for super fast storage like NVME
591        // where the page size can be smaller.
592        Ok(arrow_select::concat::concat(&array_refs)?)
593    }
594}
595
596struct SimpleStructDecodeTask {
597    children: Vec<CompositeDecodeTask>,
598    child_fields: Fields,
599}
600
601impl DecodeArrayTask for SimpleStructDecodeTask {
602    fn decode(self: Box<Self>) -> Result<ArrayRef> {
603        let child_arrays = self
604            .children
605            .into_iter()
606            .map(|child| child.decode())
607            .collect::<Result<Vec<_>>>()?;
608        Ok(Arc::new(StructArray::try_new(
609            self.child_fields,
610            child_arrays,
611            None,
612        )?))
613    }
614}