lance_encoding_datafusion/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    collections::HashMap,
6    sync::{Arc, Mutex},
7};
8
9use arrow_schema::DataType;
10use lance_core::datatypes::{Field, Schema};
11use lance_encoding::encoder::{
12    default_encoding_strategy, ColumnIndexSequence, EncodingOptions, FieldEncodingStrategy,
13};
14use lance_file::version::LanceFileVersion;
15use zone::{UnloadedPushdown, ZoneMapsFieldEncoder};
16
17pub mod format;
18pub mod substrait;
19pub mod zone;
20
21#[derive(Debug)]
22struct LanceDfFieldDecoderState {
23    /// We assume that all columns have the same number of rows per map
24    #[allow(unused)]
25    rows_per_map: Option<u32>,
26    /// As we visit the decoding tree we populate this with the pushdown
27    /// information that is available.
28    #[allow(unused)]
29    zone_map_buffers: HashMap<u32, UnloadedPushdown>,
30}
31
32/// This strategy is responsible for creating the field scheduler
33/// that handles the pushdown filtering.  It is a top-level scheduler
34/// that uses column info from various leaf schedulers.
35///
36/// The current implementation is a bit of a hack.  It assumes that
37/// the decoder strategy will only be used once.  The very first time
38/// that create_field_scheduler is called, we assume we are at the root.
39///
40/// Field decoding strategies are supposed to be stateless but this one
41/// is not.  As a result, we use a mutex to gather the state even though
42/// we aren't technically doing any concurrency.
43#[derive(Debug)]
44pub struct LanceDfFieldDecoderStrategy {
45    #[allow(unused)]
46    state: Arc<Mutex<Option<LanceDfFieldDecoderState>>>,
47    #[allow(unused)]
48    schema: Arc<Schema>,
49}
50
51impl LanceDfFieldDecoderStrategy {
52    pub fn new(schema: Arc<Schema>) -> Self {
53        Self {
54            state: Arc::new(Mutex::new(None)),
55            schema,
56        }
57    }
58
59    #[allow(unused)]
60    fn initialize(&self) -> bool {
61        let mut state = self.state.lock().unwrap();
62        if state.is_none() {
63            *state = Some(LanceDfFieldDecoderState {
64                rows_per_map: None,
65                zone_map_buffers: HashMap::new(),
66            });
67            true
68        } else {
69            false
70        }
71    }
72
73    #[allow(unused)]
74    fn add_pushdown_field(
75        &self,
76        field: &Field,
77        rows_per_map: u32,
78        unloaded_pushdown: UnloadedPushdown,
79    ) {
80        let mut state = self.state.lock().unwrap();
81        let state = state.as_mut().unwrap();
82        match state.rows_per_map {
83            Some(existing) if existing != rows_per_map => {
84                panic!("Inconsistent rows per map");
85            }
86            _ => {
87                state.rows_per_map = Some(rows_per_map);
88            }
89        }
90        state
91            .zone_map_buffers
92            .insert(field.id as u32, unloaded_pushdown);
93    }
94}
95
96// TODO: Reconnect...again
97// impl FieldDecoderStrategy for LanceDfFieldDecoderStrategy {
98//     fn create_field_scheduler<'a>(
99//         &self,
100//         field: &Field,
101//         column_infos: &mut ColumnInfoIter,
102//         buffers: FileBuffers,
103//         chain: DecoderMiddlewareChainCursor<'a>,
104//     ) -> Result<(
105//         DecoderMiddlewareChainCursor<'a>,
106//         Result<Box<dyn FieldScheduler>>,
107//     )> {
108//         let is_root = self.initialize();
109
110//         if let Some((rows_per_map, unloaded_pushdown)) =
111//             extract_zone_info(column_infos, &field.data_type(), chain.current_path())
112//         {
113//             // If there is pushdown info then record it and unwrap the
114//             // pushdown encoding layer.
115//             self.add_pushdown_field(field, rows_per_map, unloaded_pushdown);
116//         }
117//         // Delegate to the rest of the chain to create the decoder
118//         let (chain, next) = chain.next(field, column_infos, buffers)?;
119
120//         // If this is the top level decoder then wrap it with our
121//         // pushdown filtering scheduler.
122//         if is_root {
123//             let state = self.state.lock().unwrap().take().unwrap();
124//             let schema = self.schema.clone();
125//             let rows_per_map = state.rows_per_map;
126//             let zone_map_buffers = state.zone_map_buffers;
127//             let next = next?;
128//             let num_rows = next.num_rows();
129//             if rows_per_map.is_none() {
130//                 // No columns had any pushdown info
131//                 Ok((chain, Ok(next)))
132//             } else {
133//                 let scheduler = ZoneMapsFieldScheduler::new(
134//                     next.into(),
135//                     schema,
136//                     zone_map_buffers,
137//                     rows_per_map.unwrap(),
138//                     num_rows,
139//                 );
140//                 Ok((chain, Ok(Box::new(scheduler))))
141//             }
142//         } else {
143//             Ok((chain, next))
144//         }
145//     }
146// }
147
148/// Wraps the core encoding strategy and adds the encoders from this
149/// crate
150#[derive(Debug)]
151pub struct LanceDfFieldEncodingStrategy {
152    inner: Box<dyn FieldEncodingStrategy>,
153    rows_per_map: u32,
154}
155
156impl Default for LanceDfFieldEncodingStrategy {
157    fn default() -> Self {
158        Self {
159            inner: default_encoding_strategy(LanceFileVersion::default()),
160            rows_per_map: 10000,
161        }
162    }
163}
164
165impl FieldEncodingStrategy for LanceDfFieldEncodingStrategy {
166    fn create_field_encoder(
167        &self,
168        encoding_strategy_root: &dyn FieldEncodingStrategy,
169        field: &lance_core::datatypes::Field,
170        column_index: &mut ColumnIndexSequence,
171        options: &EncodingOptions,
172    ) -> lance_core::Result<Box<dyn lance_encoding::encoder::FieldEncoder>> {
173        let data_type = field.data_type();
174        if data_type.is_primitive()
175            || matches!(
176                data_type,
177                DataType::Boolean | DataType::Utf8 | DataType::LargeUtf8
178            )
179        {
180            let inner_encoder = self.inner.create_field_encoder(
181                // Don't collect stats on inner string fields
182                self.inner.as_ref(),
183                field,
184                column_index,
185                options,
186            )?;
187            Ok(Box::new(ZoneMapsFieldEncoder::try_new(
188                inner_encoder,
189                data_type,
190                self.rows_per_map,
191            )?))
192        } else {
193            self.inner
194                .create_field_encoder(encoding_strategy_root, field, column_index, options)
195        }
196    }
197}