lance_encoding/previous/decoder.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{collections::VecDeque, ops::Range};
5
6use crate::decoder::{
7 FilterExpression, NextDecodeTask, PriorityRange, ScheduledScanLine, SchedulerContext,
8};
9
10use arrow_schema::DataType;
11use futures::future::BoxFuture;
12use lance_core::{Error, Result};
13
14pub trait SchedulingJob: std::fmt::Debug {
15 fn schedule_next(
16 &mut self,
17 context: &mut SchedulerContext,
18 priority: &dyn PriorityRange,
19 ) -> Result<ScheduledScanLine>;
20
21 fn num_rows(&self) -> u64;
22}
23
24/// A scheduler for a field's worth of data
25///
26/// Each field in a reader's output schema maps to one field scheduler. This scheduler may
27/// map to more than one column. For example, one field of struct data may
28/// cover many columns of child data. In fact, the entire file is treated as one
29/// top-level struct field.
30///
31/// The scheduler is responsible for calculating the necessary I/O. One schedule_range
32/// request could trigger multiple batches of I/O across multiple columns. The scheduler
33/// should emit decoders into the sink as quickly as possible.
34///
35/// As soon as the scheduler encounters a batch of data that can decoded then the scheduler
36/// should emit a decoder in the "unloaded" state. The decode stream will pull the decoder
37/// and start decoding.
38///
39/// The order in which decoders are emitted is important. Pages should be emitted in
40/// row-major order allowing decode of complete rows as quickly as possible.
41///
42/// The `FieldScheduler` should be stateless and `Send` and `Sync`. This is
43/// because it might need to be shared. For example, a list page has a reference to
44/// the field schedulers for its items column. This is shared with the follow-up I/O
45/// task created when the offsets are loaded.
46///
47/// See [`crate::decoder`] for more information
48pub trait FieldScheduler: Send + Sync + std::fmt::Debug {
49 /// Called at the beginning of scheduling to initialize the scheduler
50 fn initialize<'a>(
51 &'a self,
52 filter: &'a FilterExpression,
53 context: &'a SchedulerContext,
54 ) -> BoxFuture<'a, Result<()>>;
55 /// Schedules I/O for the requested portions of the field.
56 ///
57 /// Note: `ranges` must be ordered and non-overlapping
58 /// TODO: Support unordered or overlapping ranges in file scheduler
59 fn schedule_ranges<'a>(
60 &'a self,
61 ranges: &[Range<u64>],
62 filter: &FilterExpression,
63 ) -> Result<Box<dyn SchedulingJob + 'a>>;
64 /// The number of rows in this field
65 fn num_rows(&self) -> u64;
66}
67
68#[derive(Debug)]
69pub struct DecoderReady {
70 // The decoder that is ready to be decoded
71 pub decoder: Box<dyn LogicalPageDecoder>,
72 // The path to the decoder, the first value is the column index
73 // following values, if present, are nested child indices
74 //
75 // For example, a path of [1, 1, 0] would mean to grab the second
76 // column, then the second child, and then the first child.
77 //
78 // It could represent x in the following schema:
79 //
80 // score: float64
81 // points: struct
82 // color: string
83 // location: struct
84 // x: float64
85 //
86 // Currently, only struct decoders have "children" although other
87 // decoders may at some point as well. List children are only
88 // handled through indirect I/O at the moment and so they don't
89 // need to be represented (yet)
90 pub path: VecDeque<u32>,
91}
92
93/// A decoder for a field's worth of data
94///
95/// The decoder is initially "unloaded" (doesn't have all its data). The [`Self::wait_for_loaded`]
96/// method should be called to wait for the needed I/O data before attempting to decode
97/// any further.
98///
99/// Unlike the other decoder types it is assumed that `LogicalPageDecoder` is stateful
100/// and only `Send`. This is why we don't need a `rows_to_skip` argument in [`Self::drain`]
101pub trait LogicalPageDecoder: std::fmt::Debug + Send {
102 /// Add a newly scheduled child decoder
103 ///
104 /// The default implementation does not expect children and returns
105 /// an error.
106 fn accept_child(&mut self, _child: DecoderReady) -> Result<()> {
107 Err(Error::internal(format!(
108 "The decoder {:?} does not expect children but received a child",
109 self
110 )))
111 }
112 /// Waits until at least `num_rows` have been loaded
113 fn wait_for_loaded(&'_ mut self, loaded_need: u64) -> BoxFuture<'_, Result<()>>;
114 /// The number of rows loaded so far
115 fn rows_loaded(&self) -> u64;
116 /// The number of rows that still need loading
117 fn rows_unloaded(&self) -> u64 {
118 self.num_rows() - self.rows_loaded()
119 }
120 /// The total number of rows in the field
121 fn num_rows(&self) -> u64;
122 /// The number of rows that have been drained so far
123 fn rows_drained(&self) -> u64;
124 /// The number of rows that are still available to drain
125 fn rows_left(&self) -> u64 {
126 self.num_rows() - self.rows_drained()
127 }
128 /// Creates a task to decode `num_rows` of data into an array
129 fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask>;
130 /// The data type of the decoded data
131 fn data_type(&self) -> &DataType;
132}