lance_encoding_datafusion/lib.rs
1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5 collections::HashMap,
6 sync::{Arc, Mutex},
7};
8
9use arrow_schema::DataType;
10use lance_core::datatypes::{Field, Schema};
11use lance_encoding::encoder::{
12 default_encoding_strategy, ColumnIndexSequence, EncodingOptions, FieldEncodingStrategy,
13};
14use lance_file::version::LanceFileVersion;
15use zone::{UnloadedPushdown, ZoneMapsFieldEncoder};
16
17pub mod format;
18pub mod substrait;
19pub mod zone;
20
21#[derive(Debug)]
22struct LanceDfFieldDecoderState {
23 /// We assume that all columns have the same number of rows per map
24 #[allow(unused)]
25 rows_per_map: Option<u32>,
26 /// As we visit the decoding tree we populate this with the pushdown
27 /// information that is available.
28 #[allow(unused)]
29 zone_map_buffers: HashMap<u32, UnloadedPushdown>,
30}
31
32/// This strategy is responsible for creating the field scheduler
33/// that handles the pushdown filtering. It is a top-level scheduler
34/// that uses column info from various leaf schedulers.
35///
36/// The current implementation is a bit of a hack. It assumes that
37/// the decoder strategy will only be used once. The very first time
38/// that create_field_scheduler is called, we assume we are at the root.
39///
40/// Field decoding strategies are supposed to be stateless but this one
41/// is not. As a result, we use a mutex to gather the state even though
42/// we aren't technically doing any concurrency.
43#[derive(Debug)]
44pub struct LanceDfFieldDecoderStrategy {
45 #[allow(unused)]
46 state: Arc<Mutex<Option<LanceDfFieldDecoderState>>>,
47 #[allow(unused)]
48 schema: Arc<Schema>,
49}
50
51impl LanceDfFieldDecoderStrategy {
52 pub fn new(schema: Arc<Schema>) -> Self {
53 Self {
54 state: Arc::new(Mutex::new(None)),
55 schema,
56 }
57 }
58
59 #[allow(unused)]
60 fn initialize(&self) -> bool {
61 let mut state = self.state.lock().unwrap();
62 if state.is_none() {
63 *state = Some(LanceDfFieldDecoderState {
64 rows_per_map: None,
65 zone_map_buffers: HashMap::new(),
66 });
67 true
68 } else {
69 false
70 }
71 }
72
73 #[allow(unused)]
74 fn add_pushdown_field(
75 &self,
76 field: &Field,
77 rows_per_map: u32,
78 unloaded_pushdown: UnloadedPushdown,
79 ) {
80 let mut state = self.state.lock().unwrap();
81 let state = state.as_mut().unwrap();
82 match state.rows_per_map {
83 Some(existing) if existing != rows_per_map => {
84 panic!("Inconsistent rows per map");
85 }
86 _ => {
87 state.rows_per_map = Some(rows_per_map);
88 }
89 }
90 state
91 .zone_map_buffers
92 .insert(field.id as u32, unloaded_pushdown);
93 }
94}
95
96// TODO: Reconnect...again
97// impl FieldDecoderStrategy for LanceDfFieldDecoderStrategy {
98// fn create_field_scheduler<'a>(
99// &self,
100// field: &Field,
101// column_infos: &mut ColumnInfoIter,
102// buffers: FileBuffers,
103// chain: DecoderMiddlewareChainCursor<'a>,
104// ) -> Result<(
105// DecoderMiddlewareChainCursor<'a>,
106// Result<Box<dyn FieldScheduler>>,
107// )> {
108// let is_root = self.initialize();
109
110// if let Some((rows_per_map, unloaded_pushdown)) =
111// extract_zone_info(column_infos, &field.data_type(), chain.current_path())
112// {
113// // If there is pushdown info then record it and unwrap the
114// // pushdown encoding layer.
115// self.add_pushdown_field(field, rows_per_map, unloaded_pushdown);
116// }
117// // Delegate to the rest of the chain to create the decoder
118// let (chain, next) = chain.next(field, column_infos, buffers)?;
119
120// // If this is the top level decoder then wrap it with our
121// // pushdown filtering scheduler.
122// if is_root {
123// let state = self.state.lock().unwrap().take().unwrap();
124// let schema = self.schema.clone();
125// let rows_per_map = state.rows_per_map;
126// let zone_map_buffers = state.zone_map_buffers;
127// let next = next?;
128// let num_rows = next.num_rows();
129// if rows_per_map.is_none() {
130// // No columns had any pushdown info
131// Ok((chain, Ok(next)))
132// } else {
133// let scheduler = ZoneMapsFieldScheduler::new(
134// next.into(),
135// schema,
136// zone_map_buffers,
137// rows_per_map.unwrap(),
138// num_rows,
139// );
140// Ok((chain, Ok(Box::new(scheduler))))
141// }
142// } else {
143// Ok((chain, next))
144// }
145// }
146// }
147
148/// Wraps the core encoding strategy and adds the encoders from this
149/// crate
150#[derive(Debug)]
151pub struct LanceDfFieldEncodingStrategy {
152 inner: Box<dyn FieldEncodingStrategy>,
153 rows_per_map: u32,
154}
155
156impl Default for LanceDfFieldEncodingStrategy {
157 fn default() -> Self {
158 Self {
159 inner: default_encoding_strategy(LanceFileVersion::default()),
160 rows_per_map: 10000,
161 }
162 }
163}
164
165impl FieldEncodingStrategy for LanceDfFieldEncodingStrategy {
166 fn create_field_encoder(
167 &self,
168 encoding_strategy_root: &dyn FieldEncodingStrategy,
169 field: &lance_core::datatypes::Field,
170 column_index: &mut ColumnIndexSequence,
171 options: &EncodingOptions,
172 ) -> lance_core::Result<Box<dyn lance_encoding::encoder::FieldEncoder>> {
173 let data_type = field.data_type();
174 if data_type.is_primitive()
175 || matches!(
176 data_type,
177 DataType::Boolean | DataType::Utf8 | DataType::LargeUtf8
178 )
179 {
180 let inner_encoder = self.inner.create_field_encoder(
181 // Don't collect stats on inner string fields
182 self.inner.as_ref(),
183 field,
184 column_index,
185 options,
186 )?;
187 Ok(Box::new(ZoneMapsFieldEncoder::try_new(
188 inner_encoder,
189 data_type,
190 self.rows_per_map,
191 )?))
192 } else {
193 self.inner
194 .create_field_encoder(encoding_strategy_root, field, column_index, options)
195 }
196 }
197}