1use std::{collections::HashMap, sync::Arc};
17
18use arrow_array::{Array, ArrayRef, RecordBatch};
19use arrow_schema::DataType;
20use bytes::{Bytes, BytesMut};
21use futures::future::BoxFuture;
22use lance_core::datatypes::{Field, Schema};
23use lance_core::utils::bit::{is_pwr_two, pad_bytes_to};
24use lance_core::{Error, Result};
25use snafu::location;
26
27use crate::buffer::LanceBuffer;
28use crate::compression::{CompressionStrategy, DefaultCompressionStrategy};
29use crate::compression_config::CompressionParams;
30use crate::decoder::PageEncoding;
31use crate::encodings::logical::list::ListStructuralEncoder;
32use crate::encodings::logical::primitive::PrimitiveStructuralEncoder;
33use crate::encodings::logical::r#struct::StructStructuralEncoder;
34use crate::repdef::RepDefBuilder;
35use crate::version::LanceFileVersion;
36use crate::{
37 decoder::{ColumnInfo, PageInfo},
38 format::pb,
39};
40
41pub const MIN_PAGE_BUFFER_ALIGNMENT: u64 = 8;
43
44#[derive(Debug)]
50pub struct EncodedPage {
51 pub data: Vec<LanceBuffer>,
53 pub description: PageEncoding,
55 pub num_rows: u64,
57 pub row_number: u64,
64 pub column_idx: u32,
66}
67
68pub struct EncodedColumn {
69 pub column_buffers: Vec<LanceBuffer>,
70 pub encoding: pb::ColumnEncoding,
71 pub final_pages: Vec<EncodedPage>,
72}
73
74impl Default for EncodedColumn {
75 fn default() -> Self {
76 Self {
77 column_buffers: Default::default(),
78 encoding: pb::ColumnEncoding {
79 column_encoding: Some(pb::column_encoding::ColumnEncoding::Values(())),
80 },
81 final_pages: Default::default(),
82 }
83 }
84}
85
86pub struct OutOfLineBuffers {
100 position: u64,
101 buffer_alignment: u64,
102 buffers: Vec<LanceBuffer>,
103}
104
105impl OutOfLineBuffers {
106 pub fn new(base_position: u64, buffer_alignment: u64) -> Self {
107 Self {
108 position: base_position,
109 buffer_alignment,
110 buffers: Vec::new(),
111 }
112 }
113
114 pub fn add_buffer(&mut self, buffer: LanceBuffer) -> u64 {
115 let position = self.position;
116 self.position += buffer.len() as u64;
117 self.position += pad_bytes_to(buffer.len(), self.buffer_alignment as usize) as u64;
118 self.buffers.push(buffer);
119 position
120 }
121
122 pub fn take_buffers(self) -> Vec<LanceBuffer> {
123 self.buffers
124 }
125
126 pub fn reset_position(&mut self, position: u64) {
127 self.position = position;
128 }
129}
130
131pub type EncodeTask = BoxFuture<'static, Result<EncodedPage>>;
133
134pub trait FieldEncoder: Send {
145 fn maybe_encode(
165 &mut self,
166 array: ArrayRef,
167 external_buffers: &mut OutOfLineBuffers,
168 repdef: RepDefBuilder,
169 row_number: u64,
170 num_rows: u64,
171 ) -> Result<Vec<EncodeTask>>;
172 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>>;
181 fn finish(
187 &mut self,
188 external_buffers: &mut OutOfLineBuffers,
189 ) -> BoxFuture<'_, Result<Vec<EncodedColumn>>>;
190
191 fn num_columns(&self) -> u32;
193}
194
195#[derive(Debug, Default)]
198pub struct ColumnIndexSequence {
199 current_index: u32,
200 mapping: Vec<(u32, u32)>,
201}
202
203impl ColumnIndexSequence {
204 pub fn next_column_index(&mut self, field_id: u32) -> u32 {
205 let idx = self.current_index;
206 self.current_index += 1;
207 self.mapping.push((field_id, idx));
208 idx
209 }
210
211 pub fn skip(&mut self) {
212 self.current_index += 1;
213 }
214}
215
216pub struct EncodingOptions {
218 pub cache_bytes_per_column: u64,
222 pub max_page_bytes: u64,
225 pub keep_original_array: bool,
230 pub buffer_alignment: u64,
235}
236
237impl Default for EncodingOptions {
238 fn default() -> Self {
239 Self {
240 cache_bytes_per_column: 8 * 1024 * 1024,
241 max_page_bytes: 32 * 1024 * 1024,
242 keep_original_array: true,
243 buffer_alignment: 64,
244 }
245 }
246}
247
248pub trait FieldEncodingStrategy: Send + Sync + std::fmt::Debug {
254 fn create_field_encoder(
266 &self,
267 encoding_strategy_root: &dyn FieldEncodingStrategy,
268 field: &Field,
269 column_index: &mut ColumnIndexSequence,
270 options: &EncodingOptions,
271 ) -> Result<Box<dyn FieldEncoder>>;
272}
273
274pub fn default_encoding_strategy(version: LanceFileVersion) -> Box<dyn FieldEncodingStrategy> {
275 match version.resolve() {
276 LanceFileVersion::Legacy => panic!(),
277 LanceFileVersion::V2_0 => {
278 Box::new(crate::previous::encoder::CoreFieldEncodingStrategy::default())
279 }
280 _ => Box::new(StructuralEncodingStrategy::default()),
281 }
282}
283
284pub fn default_encoding_strategy_with_params(
286 version: LanceFileVersion,
287 params: CompressionParams,
288) -> Result<Box<dyn FieldEncodingStrategy>> {
289 match version.resolve() {
290 LanceFileVersion::Legacy | LanceFileVersion::V2_0 => Err(Error::invalid_input(
291 "Compression parameters are only supported in Lance file version 2.1 and later",
292 location!(),
293 )),
294 _ => {
295 let compression_strategy = Arc::new(DefaultCompressionStrategy::with_params(params));
296 Ok(Box::new(StructuralEncodingStrategy {
297 compression_strategy,
298 version,
299 }))
300 }
301 }
302}
303
304#[derive(Debug)]
306pub struct StructuralEncodingStrategy {
307 pub compression_strategy: Arc<dyn CompressionStrategy>,
308 pub version: LanceFileVersion,
309}
310
311#[allow(clippy::derivable_impls)]
314impl Default for StructuralEncodingStrategy {
315 fn default() -> Self {
316 Self {
317 compression_strategy: Arc::new(DefaultCompressionStrategy::new()),
318 version: LanceFileVersion::default(),
319 }
320 }
321}
322
323impl StructuralEncodingStrategy {
324 fn is_primitive_type(data_type: &DataType) -> bool {
325 matches!(
326 data_type,
327 DataType::Boolean
328 | DataType::Date32
329 | DataType::Date64
330 | DataType::Decimal128(_, _)
331 | DataType::Decimal256(_, _)
332 | DataType::Duration(_)
333 | DataType::Float16
334 | DataType::Float32
335 | DataType::Float64
336 | DataType::Int16
337 | DataType::Int32
338 | DataType::Int64
339 | DataType::Int8
340 | DataType::Interval(_)
341 | DataType::Null
342 | DataType::Time32(_)
343 | DataType::Time64(_)
344 | DataType::Timestamp(_, _)
345 | DataType::UInt16
346 | DataType::UInt32
347 | DataType::UInt64
348 | DataType::UInt8
349 | DataType::FixedSizeBinary(_)
350 | DataType::FixedSizeList(_, _)
351 | DataType::Binary
352 | DataType::LargeBinary
353 | DataType::Utf8
354 | DataType::LargeUtf8,
355 )
356 }
357
358 fn do_create_field_encoder(
359 &self,
360 _encoding_strategy_root: &dyn FieldEncodingStrategy,
361 field: &Field,
362 column_index: &mut ColumnIndexSequence,
363 options: &EncodingOptions,
364 root_field_metadata: &HashMap<String, String>,
365 ) -> Result<Box<dyn FieldEncoder>> {
366 let data_type = field.data_type();
367 if Self::is_primitive_type(&data_type) {
368 Ok(Box::new(PrimitiveStructuralEncoder::try_new(
369 options,
370 self.compression_strategy.clone(),
371 column_index.next_column_index(field.id as u32),
372 field.clone(),
373 Arc::new(root_field_metadata.clone()),
374 )?))
375 } else {
376 match data_type {
377 DataType::List(_) | DataType::LargeList(_) => {
378 let child = field.children.first().expect("List should have a child");
379 let child_encoder = self.do_create_field_encoder(
380 _encoding_strategy_root,
381 child,
382 column_index,
383 options,
384 root_field_metadata,
385 )?;
386 Ok(Box::new(ListStructuralEncoder::new(
387 options.keep_original_array,
388 child_encoder,
389 )))
390 }
391 DataType::Struct(_) => {
392 if field.is_packed_struct() {
393 Ok(Box::new(PrimitiveStructuralEncoder::try_new(
394 options,
395 self.compression_strategy.clone(),
396 column_index.next_column_index(field.id as u32),
397 field.clone(),
398 Arc::new(root_field_metadata.clone()),
399 )?))
400 } else {
401 let children_encoders = field
402 .children
403 .iter()
404 .map(|field| {
405 self.do_create_field_encoder(
406 _encoding_strategy_root,
407 field,
408 column_index,
409 options,
410 root_field_metadata,
411 )
412 })
413 .collect::<Result<Vec<_>>>()?;
414 Ok(Box::new(StructStructuralEncoder::new(
415 options.keep_original_array,
416 children_encoders,
417 )))
418 }
419 }
420 DataType::Dictionary(_, value_type) => {
421 if Self::is_primitive_type(&value_type) {
423 Ok(Box::new(PrimitiveStructuralEncoder::try_new(
424 options,
425 self.compression_strategy.clone(),
426 column_index.next_column_index(field.id as u32),
427 field.clone(),
428 Arc::new(root_field_metadata.clone()),
429 )?))
430 } else {
431 Err(Error::NotSupported { source: format!("cannot encode a dictionary column whose value type is a logical type ({})", value_type).into(), location: location!() })
437 }
438 }
439 _ => todo!("Implement encoding for field {}", field),
440 }
441 }
442 }
443}
444
445impl FieldEncodingStrategy for StructuralEncodingStrategy {
446 fn create_field_encoder(
447 &self,
448 encoding_strategy_root: &dyn FieldEncodingStrategy,
449 field: &Field,
450 column_index: &mut ColumnIndexSequence,
451 options: &EncodingOptions,
452 ) -> Result<Box<dyn FieldEncoder>> {
453 self.do_create_field_encoder(
454 encoding_strategy_root,
455 field,
456 column_index,
457 options,
458 &field.metadata,
459 )
460 }
461}
462
463pub struct BatchEncoder {
466 pub field_encoders: Vec<Box<dyn FieldEncoder>>,
467 pub field_id_to_column_index: Vec<(u32, u32)>,
468}
469
470impl BatchEncoder {
471 pub fn try_new(
472 schema: &Schema,
473 strategy: &dyn FieldEncodingStrategy,
474 options: &EncodingOptions,
475 ) -> Result<Self> {
476 let mut col_idx = 0;
477 let mut col_idx_sequence = ColumnIndexSequence::default();
478 let field_encoders = schema
479 .fields
480 .iter()
481 .map(|field| {
482 let encoder = strategy.create_field_encoder(
483 strategy,
484 field,
485 &mut col_idx_sequence,
486 options,
487 )?;
488 col_idx += encoder.as_ref().num_columns();
489 Ok(encoder)
490 })
491 .collect::<Result<Vec<_>>>()?;
492 Ok(Self {
493 field_encoders,
494 field_id_to_column_index: col_idx_sequence.mapping,
495 })
496 }
497
498 pub fn num_columns(&self) -> u32 {
499 self.field_encoders
500 .iter()
501 .map(|field_encoder| field_encoder.num_columns())
502 .sum::<u32>()
503 }
504}
505
506#[derive(Debug)]
510pub struct EncodedBatch {
511 pub data: Bytes,
512 pub page_table: Vec<Arc<ColumnInfo>>,
513 pub schema: Arc<Schema>,
514 pub top_level_columns: Vec<u32>,
515 pub num_rows: u64,
516}
517
518fn write_page_to_data_buffer(page: EncodedPage, data_buffer: &mut BytesMut) -> PageInfo {
519 let buffers = page.data;
520 let mut buffer_offsets_and_sizes = Vec::with_capacity(buffers.len());
521 for buffer in buffers {
522 let buffer_offset = data_buffer.len() as u64;
523 data_buffer.extend_from_slice(&buffer);
524 let size = data_buffer.len() as u64 - buffer_offset;
525 buffer_offsets_and_sizes.push((buffer_offset, size));
526 }
527
528 PageInfo {
529 buffer_offsets_and_sizes: Arc::from(buffer_offsets_and_sizes),
530 encoding: page.description,
531 num_rows: page.num_rows,
532 priority: page.row_number,
533 }
534}
535
536pub async fn encode_batch(
541 batch: &RecordBatch,
542 schema: Arc<Schema>,
543 encoding_strategy: &dyn FieldEncodingStrategy,
544 options: &EncodingOptions,
545) -> Result<EncodedBatch> {
546 if !is_pwr_two(options.buffer_alignment) || options.buffer_alignment < MIN_PAGE_BUFFER_ALIGNMENT
547 {
548 return Err(Error::InvalidInput {
549 source: format!(
550 "buffer_alignment must be a power of two and at least {}",
551 MIN_PAGE_BUFFER_ALIGNMENT
552 )
553 .into(),
554 location: location!(),
555 });
556 }
557
558 let mut data_buffer = BytesMut::new();
559 let lance_schema = Schema::try_from(batch.schema().as_ref())?;
560 let options = EncodingOptions {
561 keep_original_array: true,
562 ..*options
563 };
564 let batch_encoder = BatchEncoder::try_new(&lance_schema, encoding_strategy, &options)?;
565 let mut page_table = Vec::new();
566 let mut col_idx_offset = 0;
567 for (arr, mut encoder) in batch.columns().iter().zip(batch_encoder.field_encoders) {
568 let mut external_buffers =
569 OutOfLineBuffers::new(data_buffer.len() as u64, options.buffer_alignment);
570 let repdef = RepDefBuilder::default();
571 let encoder = encoder.as_mut();
572 let num_rows = arr.len() as u64;
573 let mut tasks =
574 encoder.maybe_encode(arr.clone(), &mut external_buffers, repdef, 0, num_rows)?;
575 tasks.extend(encoder.flush(&mut external_buffers)?);
576 for buffer in external_buffers.take_buffers() {
577 data_buffer.extend_from_slice(&buffer);
578 }
579 let mut pages = HashMap::<u32, Vec<PageInfo>>::new();
580 for task in tasks {
581 let encoded_page = task.await?;
582 pages
584 .entry(encoded_page.column_idx)
585 .or_default()
586 .push(write_page_to_data_buffer(encoded_page, &mut data_buffer));
587 }
588 let mut external_buffers =
589 OutOfLineBuffers::new(data_buffer.len() as u64, options.buffer_alignment);
590 let encoded_columns = encoder.finish(&mut external_buffers).await?;
591 for buffer in external_buffers.take_buffers() {
592 data_buffer.extend_from_slice(&buffer);
593 }
594 let num_columns = encoded_columns.len();
595 for (col_idx, encoded_column) in encoded_columns.into_iter().enumerate() {
596 let col_idx = col_idx + col_idx_offset;
597 let mut col_buffer_offsets_and_sizes = Vec::new();
598 for buffer in encoded_column.column_buffers {
599 let buffer_offset = data_buffer.len() as u64;
600 data_buffer.extend_from_slice(&buffer);
601 let size = data_buffer.len() as u64 - buffer_offset;
602 col_buffer_offsets_and_sizes.push((buffer_offset, size));
603 }
604 for page in encoded_column.final_pages {
605 pages
606 .entry(page.column_idx)
607 .or_default()
608 .push(write_page_to_data_buffer(page, &mut data_buffer));
609 }
610 let col_pages = std::mem::take(pages.entry(col_idx as u32).or_default());
611 page_table.push(Arc::new(ColumnInfo {
612 index: col_idx as u32,
613 buffer_offsets_and_sizes: Arc::from(
614 col_buffer_offsets_and_sizes.into_boxed_slice(),
615 ),
616 page_infos: Arc::from(col_pages.into_boxed_slice()),
617 encoding: encoded_column.encoding,
618 }))
619 }
620 col_idx_offset += num_columns;
621 }
622 let top_level_columns = batch_encoder
623 .field_id_to_column_index
624 .iter()
625 .map(|(_, idx)| *idx)
626 .collect();
627 Ok(EncodedBatch {
628 data: data_buffer.freeze(),
629 top_level_columns,
630 page_table,
631 schema,
632 num_rows: batch.num_rows() as u64,
633 })
634}
635
636#[cfg(test)]
637mod tests {
638 use super::*;
639 use crate::compression_config::{CompressionFieldParams, CompressionParams};
640
641 #[test]
642 fn test_configured_encoding_strategy() {
643 let mut params = CompressionParams::new();
645 params.columns.insert(
646 "*_id".to_string(),
647 CompressionFieldParams {
648 rle_threshold: Some(0.5),
649 compression: Some("lz4".to_string()),
650 compression_level: None,
651 },
652 );
653
654 let strategy =
656 default_encoding_strategy_with_params(LanceFileVersion::V2_1, params.clone())
657 .expect("Should succeed for V2.1");
658
659 assert!(format!("{:?}", strategy).contains("StructuralEncodingStrategy"));
661 assert!(format!("{:?}", strategy).contains("DefaultCompressionStrategy"));
662
663 let err = default_encoding_strategy_with_params(LanceFileVersion::V2_0, params.clone())
665 .expect_err("Should fail for V2.0");
666 assert!(err
667 .to_string()
668 .contains("only supported in Lance file version 2.1"));
669
670 let err = default_encoding_strategy_with_params(LanceFileVersion::Legacy, params)
672 .expect_err("Should fail for Legacy");
673 assert!(err
674 .to_string()
675 .contains("only supported in Lance file version 2.1"));
676 }
677}