Skip to main content

lance_encoding/previous/
decoder.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{collections::VecDeque, ops::Range};
5
6use crate::decoder::{
7    FilterExpression, NextDecodeTask, PriorityRange, ScheduledScanLine, SchedulerContext,
8};
9
10use arrow_schema::DataType;
11use futures::future::BoxFuture;
12use lance_core::{Error, Result};
13
14pub trait SchedulingJob: std::fmt::Debug {
15    fn schedule_next(
16        &mut self,
17        context: &mut SchedulerContext,
18        priority: &dyn PriorityRange,
19    ) -> Result<ScheduledScanLine>;
20
21    fn num_rows(&self) -> u64;
22}
23
24/// A scheduler for a field's worth of data
25///
26/// Each field in a reader's output schema maps to one field scheduler.  This scheduler may
27/// map to more than one column.  For example, one field of struct data may
28/// cover many columns of child data.  In fact, the entire file is treated as one
29/// top-level struct field.
30///
31/// The scheduler is responsible for calculating the necessary I/O.  One schedule_range
32/// request could trigger multiple batches of I/O across multiple columns.  The scheduler
33/// should emit decoders into the sink as quickly as possible.
34///
35/// As soon as the scheduler encounters a batch of data that can decoded then the scheduler
36/// should emit a decoder in the "unloaded" state.  The decode stream will pull the decoder
37/// and start decoding.
38///
39/// The order in which decoders are emitted is important.  Pages should be emitted in
40/// row-major order allowing decode of complete rows as quickly as possible.
41///
42/// The `FieldScheduler` should be stateless and `Send` and `Sync`.  This is
43/// because it might need to be shared.  For example, a list page has a reference to
44/// the field schedulers for its items column.  This is shared with the follow-up I/O
45/// task created when the offsets are loaded.
46///
47/// See [`crate::decoder`] for more information
48pub trait FieldScheduler: Send + Sync + std::fmt::Debug {
49    /// Called at the beginning of scheduling to initialize the scheduler
50    fn initialize<'a>(
51        &'a self,
52        filter: &'a FilterExpression,
53        context: &'a SchedulerContext,
54    ) -> BoxFuture<'a, Result<()>>;
55    /// Schedules I/O for the requested portions of the field.
56    ///
57    /// Note: `ranges` must be ordered and non-overlapping
58    /// TODO: Support unordered or overlapping ranges in file scheduler
59    fn schedule_ranges<'a>(
60        &'a self,
61        ranges: &[Range<u64>],
62        filter: &FilterExpression,
63    ) -> Result<Box<dyn SchedulingJob + 'a>>;
64    /// The number of rows in this field
65    fn num_rows(&self) -> u64;
66}
67
68#[derive(Debug)]
69pub struct DecoderReady {
70    // The decoder that is ready to be decoded
71    pub decoder: Box<dyn LogicalPageDecoder>,
72    // The path to the decoder, the first value is the column index
73    // following values, if present, are nested child indices
74    //
75    // For example, a path of [1, 1, 0] would mean to grab the second
76    // column, then the second child, and then the first child.
77    //
78    // It could represent x in the following schema:
79    //
80    // score: float64
81    // points: struct
82    //   color: string
83    //   location: struct
84    //     x: float64
85    //
86    // Currently, only struct decoders have "children" although other
87    // decoders may at some point as well.  List children are only
88    // handled through indirect I/O at the moment and so they don't
89    // need to be represented (yet)
90    pub path: VecDeque<u32>,
91}
92
93/// A decoder for a field's worth of data
94///
95/// The decoder is initially "unloaded" (doesn't have all its data).  The [`Self::wait_for_loaded`]
96/// method should be called to wait for the needed I/O data before attempting to decode
97/// any further.
98///
99/// Unlike the other decoder types it is assumed that `LogicalPageDecoder` is stateful
100/// and only `Send`.  This is why we don't need a `rows_to_skip` argument in [`Self::drain`]
101pub trait LogicalPageDecoder: std::fmt::Debug + Send {
102    /// Add a newly scheduled child decoder
103    ///
104    /// The default implementation does not expect children and returns
105    /// an error.
106    fn accept_child(&mut self, _child: DecoderReady) -> Result<()> {
107        Err(Error::internal(format!(
108            "The decoder {:?} does not expect children but received a child",
109            self
110        )))
111    }
112    /// Waits until at least `num_rows` have been loaded
113    fn wait_for_loaded(&'_ mut self, loaded_need: u64) -> BoxFuture<'_, Result<()>>;
114    /// The number of rows loaded so far
115    fn rows_loaded(&self) -> u64;
116    /// The number of rows that still need loading
117    fn rows_unloaded(&self) -> u64 {
118        self.num_rows() - self.rows_loaded()
119    }
120    /// The total number of rows in the field
121    fn num_rows(&self) -> u64;
122    /// The number of rows that have been drained so far
123    fn rows_drained(&self) -> u64;
124    /// The number of rows that are still available to drain
125    fn rows_left(&self) -> u64 {
126        self.num_rows() - self.rows_drained()
127    }
128    /// Creates a task to decode `num_rows` of data into an array
129    fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask>;
130    /// The data type of the decoded data
131    fn data_type(&self) -> &DataType;
132}