use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use arrow_schema::DataType;
use lance_core::datatypes::{Field, Schema};
use lance_encoding::encoder::{
default_encoding_strategy, ColumnIndexSequence, EncodingOptions, FieldEncodingStrategy,
};
use lance_file::version::LanceFileVersion;
use zone::{UnloadedPushdown, ZoneMapsFieldEncoder};
pub mod format;
pub mod substrait;
pub mod zone;
#[derive(Debug)]
struct LanceDfFieldDecoderState {
#[allow(unused)]
rows_per_map: Option<u32>,
#[allow(unused)]
zone_map_buffers: HashMap<u32, UnloadedPushdown>,
}
#[derive(Debug)]
pub struct LanceDfFieldDecoderStrategy {
#[allow(unused)]
state: Arc<Mutex<Option<LanceDfFieldDecoderState>>>,
#[allow(unused)]
schema: Arc<Schema>,
}
impl LanceDfFieldDecoderStrategy {
pub fn new(schema: Arc<Schema>) -> Self {
Self {
state: Arc::new(Mutex::new(None)),
schema,
}
}
#[allow(unused)]
fn initialize(&self) -> bool {
let mut state = self.state.lock().unwrap();
if state.is_none() {
*state = Some(LanceDfFieldDecoderState {
rows_per_map: None,
zone_map_buffers: HashMap::new(),
});
true
} else {
false
}
}
#[allow(unused)]
fn add_pushdown_field(
&self,
field: &Field,
rows_per_map: u32,
unloaded_pushdown: UnloadedPushdown,
) {
let mut state = self.state.lock().unwrap();
let state = state.as_mut().unwrap();
match state.rows_per_map {
Some(existing) if existing != rows_per_map => {
panic!("Inconsistent rows per map");
}
_ => {
state.rows_per_map = Some(rows_per_map);
}
}
state
.zone_map_buffers
.insert(field.id as u32, unloaded_pushdown);
}
}
#[derive(Debug)]
pub struct LanceDfFieldEncodingStrategy {
inner: Box<dyn FieldEncodingStrategy>,
rows_per_map: u32,
}
impl Default for LanceDfFieldEncodingStrategy {
fn default() -> Self {
Self {
inner: default_encoding_strategy(LanceFileVersion::default()),
rows_per_map: 10000,
}
}
}
impl FieldEncodingStrategy for LanceDfFieldEncodingStrategy {
fn create_field_encoder(
&self,
encoding_strategy_root: &dyn FieldEncodingStrategy,
field: &lance_core::datatypes::Field,
column_index: &mut ColumnIndexSequence,
options: &EncodingOptions,
) -> lance_core::Result<Box<dyn lance_encoding::encoder::FieldEncoder>> {
let data_type = field.data_type();
if data_type.is_primitive()
|| matches!(
data_type,
DataType::Boolean | DataType::Utf8 | DataType::LargeUtf8
)
{
let inner_encoder = self.inner.create_field_encoder(
self.inner.as_ref(),
field,
column_index,
options,
)?;
Ok(Box::new(ZoneMapsFieldEncoder::try_new(
inner_encoder,
data_type,
self.rows_per_map,
)?))
} else {
self.inner
.create_field_encoder(encoding_strategy_root, field, column_index, options)
}
}
}