lance_encoding/previous/
decoder.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{collections::VecDeque, ops::Range};
5
6use snafu::location;
7
8use crate::decoder::{
9    FilterExpression, NextDecodeTask, PriorityRange, ScheduledScanLine, SchedulerContext,
10};
11
12use arrow_schema::DataType;
13use futures::future::BoxFuture;
14use lance_core::{Error, Result};
15
16pub trait SchedulingJob: std::fmt::Debug {
17    fn schedule_next(
18        &mut self,
19        context: &mut SchedulerContext,
20        priority: &dyn PriorityRange,
21    ) -> Result<ScheduledScanLine>;
22
23    fn num_rows(&self) -> u64;
24}
25
26/// A scheduler for a field's worth of data
27///
28/// Each field in a reader's output schema maps to one field scheduler.  This scheduler may
29/// map to more than one column.  For example, one field of struct data may
30/// cover many columns of child data.  In fact, the entire file is treated as one
31/// top-level struct field.
32///
33/// The scheduler is responsible for calculating the necessary I/O.  One schedule_range
34/// request could trigger multiple batches of I/O across multiple columns.  The scheduler
35/// should emit decoders into the sink as quickly as possible.
36///
37/// As soon as the scheduler encounters a batch of data that can decoded then the scheduler
38/// should emit a decoder in the "unloaded" state.  The decode stream will pull the decoder
39/// and start decoding.
40///
41/// The order in which decoders are emitted is important.  Pages should be emitted in
42/// row-major order allowing decode of complete rows as quickly as possible.
43///
44/// The `FieldScheduler` should be stateless and `Send` and `Sync`.  This is
45/// because it might need to be shared.  For example, a list page has a reference to
46/// the field schedulers for its items column.  This is shared with the follow-up I/O
47/// task created when the offsets are loaded.
48///
49/// See [`crate::decoder`] for more information
50pub trait FieldScheduler: Send + Sync + std::fmt::Debug {
51    /// Called at the beginning of scheduling to initialize the scheduler
52    fn initialize<'a>(
53        &'a self,
54        filter: &'a FilterExpression,
55        context: &'a SchedulerContext,
56    ) -> BoxFuture<'a, Result<()>>;
57    /// Schedules I/O for the requested portions of the field.
58    ///
59    /// Note: `ranges` must be ordered and non-overlapping
60    /// TODO: Support unordered or overlapping ranges in file scheduler
61    fn schedule_ranges<'a>(
62        &'a self,
63        ranges: &[Range<u64>],
64        filter: &FilterExpression,
65    ) -> Result<Box<dyn SchedulingJob + 'a>>;
66    /// The number of rows in this field
67    fn num_rows(&self) -> u64;
68}
69
70#[derive(Debug)]
71pub struct DecoderReady {
72    // The decoder that is ready to be decoded
73    pub decoder: Box<dyn LogicalPageDecoder>,
74    // The path to the decoder, the first value is the column index
75    // following values, if present, are nested child indices
76    //
77    // For example, a path of [1, 1, 0] would mean to grab the second
78    // column, then the second child, and then the first child.
79    //
80    // It could represent x in the following schema:
81    //
82    // score: float64
83    // points: struct
84    //   color: string
85    //   location: struct
86    //     x: float64
87    //
88    // Currently, only struct decoders have "children" although other
89    // decoders may at some point as well.  List children are only
90    // handled through indirect I/O at the moment and so they don't
91    // need to be represented (yet)
92    pub path: VecDeque<u32>,
93}
94
95/// A decoder for a field's worth of data
96///
97/// The decoder is initially "unloaded" (doesn't have all its data).  The [`Self::wait`]
98/// method should be called to wait for the needed I/O data before attempting to decode
99/// any further.
100///
101/// Unlike the other decoder types it is assumed that `LogicalPageDecoder` is stateful
102/// and only `Send`.  This is why we don't need a `rows_to_skip` argument in [`Self::drain`]
103pub trait LogicalPageDecoder: std::fmt::Debug + Send {
104    /// Add a newly scheduled child decoder
105    ///
106    /// The default implementation does not expect children and returns
107    /// an error.
108    fn accept_child(&mut self, _child: DecoderReady) -> Result<()> {
109        Err(Error::Internal {
110            message: format!(
111                "The decoder {:?} does not expect children but received a child",
112                self
113            ),
114            location: location!(),
115        })
116    }
117    /// Waits until at least `num_rows` have been loaded
118    fn wait_for_loaded(&'_ mut self, loaded_need: u64) -> BoxFuture<'_, Result<()>>;
119    /// The number of rows loaded so far
120    fn rows_loaded(&self) -> u64;
121    /// The number of rows that still need loading
122    fn rows_unloaded(&self) -> u64 {
123        self.num_rows() - self.rows_loaded()
124    }
125    /// The total number of rows in the field
126    fn num_rows(&self) -> u64;
127    /// The number of rows that have been drained so far
128    fn rows_drained(&self) -> u64;
129    /// The number of rows that are still available to drain
130    fn rows_left(&self) -> u64 {
131        self.num_rows() - self.rows_drained()
132    }
133    /// Creates a task to decode `num_rows` of data into an array
134    fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask>;
135    /// The data type of the decoded data
136    fn data_type(&self) -> &DataType;
137}