1use std::{collections::HashMap, sync::Arc};
17
18use arrow_array::{Array, ArrayRef, RecordBatch};
19use arrow_schema::DataType;
20use bytes::{Bytes, BytesMut};
21use futures::future::BoxFuture;
22use lance_core::datatypes::{Field, Schema};
23use lance_core::error::LanceOptionExt;
24use lance_core::utils::bit::{is_pwr_two, pad_bytes_to};
25use lance_core::{Error, Result};
26
27use crate::buffer::LanceBuffer;
28use crate::compression::{CompressionStrategy, DefaultCompressionStrategy};
29use crate::compression_config::CompressionParams;
30use crate::decoder::PageEncoding;
31use crate::encodings::logical::blob::{BlobStructuralEncoder, BlobV2StructuralEncoder};
32use crate::encodings::logical::fixed_size_list::FixedSizeListStructuralEncoder;
33use crate::encodings::logical::list::ListStructuralEncoder;
34use crate::encodings::logical::map::MapStructuralEncoder;
35use crate::encodings::logical::primitive::PrimitiveStructuralEncoder;
36use crate::encodings::logical::r#struct::StructStructuralEncoder;
37use crate::repdef::RepDefBuilder;
38use crate::version::LanceFileVersion;
39use crate::{
40 decoder::{ColumnInfo, PageInfo},
41 format::pb,
42};
43
44pub const MIN_PAGE_BUFFER_ALIGNMENT: u64 = 8;
46
47#[derive(Debug)]
53pub struct EncodedPage {
54 pub data: Vec<LanceBuffer>,
56 pub description: PageEncoding,
58 pub num_rows: u64,
60 pub row_number: u64,
67 pub column_idx: u32,
69}
70
71pub struct EncodedColumn {
72 pub column_buffers: Vec<LanceBuffer>,
73 pub encoding: pb::ColumnEncoding,
74 pub final_pages: Vec<EncodedPage>,
75}
76
77impl Default for EncodedColumn {
78 fn default() -> Self {
79 Self {
80 column_buffers: Default::default(),
81 encoding: pb::ColumnEncoding {
82 column_encoding: Some(pb::column_encoding::ColumnEncoding::Values(())),
83 },
84 final_pages: Default::default(),
85 }
86 }
87}
88
89pub struct OutOfLineBuffers {
103 position: u64,
104 buffer_alignment: u64,
105 buffers: Vec<LanceBuffer>,
106}
107
108impl OutOfLineBuffers {
109 pub fn new(base_position: u64, buffer_alignment: u64) -> Self {
110 Self {
111 position: base_position,
112 buffer_alignment,
113 buffers: Vec::new(),
114 }
115 }
116
117 pub fn add_buffer(&mut self, buffer: LanceBuffer) -> u64 {
118 let position = self.position;
119 self.position += buffer.len() as u64;
120 self.position += pad_bytes_to(buffer.len(), self.buffer_alignment as usize) as u64;
121 self.buffers.push(buffer);
122 position
123 }
124
125 pub fn take_buffers(self) -> Vec<LanceBuffer> {
126 self.buffers
127 }
128
129 pub fn reset_position(&mut self, position: u64) {
130 self.position = position;
131 }
132}
133
134pub type EncodeTask = BoxFuture<'static, Result<EncodedPage>>;
136
137pub trait FieldEncoder: Send {
148 fn maybe_encode(
168 &mut self,
169 array: ArrayRef,
170 external_buffers: &mut OutOfLineBuffers,
171 repdef: RepDefBuilder,
172 row_number: u64,
173 num_rows: u64,
174 ) -> Result<Vec<EncodeTask>>;
175 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>>;
184 fn finish(
190 &mut self,
191 external_buffers: &mut OutOfLineBuffers,
192 ) -> BoxFuture<'_, Result<Vec<EncodedColumn>>>;
193
194 fn num_columns(&self) -> u32;
196}
197
198#[derive(Debug, Default)]
201pub struct ColumnIndexSequence {
202 current_index: u32,
203 mapping: Vec<(u32, u32)>,
204}
205
206impl ColumnIndexSequence {
207 pub fn next_column_index(&mut self, field_id: u32) -> u32 {
208 let idx = self.current_index;
209 self.current_index += 1;
210 self.mapping.push((field_id, idx));
211 idx
212 }
213
214 pub fn skip(&mut self) {
215 self.current_index += 1;
216 }
217}
218
219pub struct EncodingOptions {
221 pub cache_bytes_per_column: u64,
225 pub max_page_bytes: u64,
228 pub keep_original_array: bool,
233 pub buffer_alignment: u64,
238
239 pub version: LanceFileVersion,
241}
242
243impl Default for EncodingOptions {
244 fn default() -> Self {
245 Self {
246 cache_bytes_per_column: 8 * 1024 * 1024,
247 max_page_bytes: 32 * 1024 * 1024,
248 keep_original_array: true,
249 buffer_alignment: 64,
250 version: LanceFileVersion::default(),
251 }
252 }
253}
254
255impl EncodingOptions {
256 pub fn support_large_chunk(&self) -> bool {
260 self.version >= LanceFileVersion::V2_2
261 }
262}
263
264pub trait FieldEncodingStrategy: Send + Sync + std::fmt::Debug {
270 fn create_field_encoder(
282 &self,
283 encoding_strategy_root: &dyn FieldEncodingStrategy,
284 field: &Field,
285 column_index: &mut ColumnIndexSequence,
286 options: &EncodingOptions,
287 ) -> Result<Box<dyn FieldEncoder>>;
288}
289
290pub fn default_encoding_strategy(version: LanceFileVersion) -> Box<dyn FieldEncodingStrategy> {
291 match version.resolve() {
292 LanceFileVersion::Legacy => panic!(),
293 LanceFileVersion::V2_0 => Box::new(
294 crate::previous::encoder::CoreFieldEncodingStrategy::new(version),
295 ),
296 _ => Box::new(StructuralEncodingStrategy::with_version(version)),
297 }
298}
299
300pub fn default_encoding_strategy_with_params(
302 version: LanceFileVersion,
303 params: CompressionParams,
304) -> Result<Box<dyn FieldEncodingStrategy>> {
305 match version.resolve() {
306 LanceFileVersion::Legacy | LanceFileVersion::V2_0 => Err(Error::invalid_input(
307 "Compression parameters are only supported in Lance file version 2.1 and later",
308 )),
309 _ => {
310 let compression_strategy =
311 Arc::new(DefaultCompressionStrategy::with_params(params).with_version(version));
312 Ok(Box::new(StructuralEncodingStrategy {
313 compression_strategy,
314 version,
315 }))
316 }
317 }
318}
319
320#[derive(Debug)]
322pub struct StructuralEncodingStrategy {
323 pub compression_strategy: Arc<dyn CompressionStrategy>,
324 pub version: LanceFileVersion,
325}
326
327#[allow(clippy::derivable_impls)]
330impl Default for StructuralEncodingStrategy {
331 fn default() -> Self {
332 Self {
333 compression_strategy: Arc::new(DefaultCompressionStrategy::new()),
334 version: LanceFileVersion::default(),
335 }
336 }
337}
338
339impl StructuralEncodingStrategy {
340 pub fn with_version(version: LanceFileVersion) -> Self {
341 Self {
342 compression_strategy: Arc::new(DefaultCompressionStrategy::new().with_version(version)),
343 version,
344 }
345 }
346
347 fn is_primitive_type(data_type: &DataType) -> bool {
348 match data_type {
349 DataType::FixedSizeList(inner, _) => Self::is_primitive_type(inner.data_type()),
350 _ => matches!(
351 data_type,
352 DataType::Boolean
353 | DataType::Date32
354 | DataType::Date64
355 | DataType::Decimal128(_, _)
356 | DataType::Decimal256(_, _)
357 | DataType::Duration(_)
358 | DataType::Float16
359 | DataType::Float32
360 | DataType::Float64
361 | DataType::Int16
362 | DataType::Int32
363 | DataType::Int64
364 | DataType::Int8
365 | DataType::Interval(_)
366 | DataType::Null
367 | DataType::Time32(_)
368 | DataType::Time64(_)
369 | DataType::Timestamp(_, _)
370 | DataType::UInt16
371 | DataType::UInt32
372 | DataType::UInt64
373 | DataType::UInt8
374 | DataType::FixedSizeBinary(_)
375 | DataType::Binary
376 | DataType::LargeBinary
377 | DataType::Utf8
378 | DataType::LargeUtf8,
379 ),
380 }
381 }
382
383 fn do_create_field_encoder(
384 &self,
385 _encoding_strategy_root: &dyn FieldEncodingStrategy,
386 field: &Field,
387 column_index: &mut ColumnIndexSequence,
388 options: &EncodingOptions,
389 root_field_metadata: &HashMap<String, String>,
390 ) -> Result<Box<dyn FieldEncoder>> {
391 let data_type = field.data_type();
392
393 if field.is_blob() {
395 match data_type {
396 DataType::Binary | DataType::LargeBinary => {
397 return Ok(Box::new(BlobStructuralEncoder::new(
398 field,
399 column_index.next_column_index(field.id as u32),
400 options,
401 self.compression_strategy.clone(),
402 )?));
403 }
404 DataType::Struct(_) if self.version >= LanceFileVersion::V2_2 => {
405 return Ok(Box::new(BlobV2StructuralEncoder::new(
406 field,
407 column_index.next_column_index(field.id as u32),
408 options,
409 self.compression_strategy.clone(),
410 )?));
411 }
412 DataType::Struct(_) => {
413 return Err(Error::invalid_input_source(
414 "Blob v2 struct input requires file version >= 2.2".into(),
415 ));
416 }
417 _ => {
418 return Err(Error::invalid_input_source(
419 format!(
420 "Blob encoding only supports Binary/LargeBinary or v2 Struct, got {}",
421 data_type
422 )
423 .into(),
424 ));
425 }
426 }
427 }
428
429 if Self::is_primitive_type(&data_type) {
430 Ok(Box::new(PrimitiveStructuralEncoder::try_new(
431 options,
432 self.compression_strategy.clone(),
433 column_index.next_column_index(field.id as u32),
434 field.clone(),
435 Arc::new(root_field_metadata.clone()),
436 )?))
437 } else {
438 match data_type {
439 DataType::List(_) | DataType::LargeList(_) => {
440 let child = field.children.first().expect_ok()?;
441 let child_encoder = self.do_create_field_encoder(
442 _encoding_strategy_root,
443 child,
444 column_index,
445 options,
446 root_field_metadata,
447 )?;
448 Ok(Box::new(ListStructuralEncoder::new(
449 options.keep_original_array,
450 child_encoder,
451 )))
452 }
453 DataType::FixedSizeList(inner, _)
454 if matches!(inner.data_type(), DataType::Struct(_)) =>
455 {
456 if self.version < LanceFileVersion::V2_2 {
457 return Err(Error::not_supported_source(format!(
458 "FixedSizeList<Struct> is only supported in Lance file format 2.2+, current version: {}",
459 self.version
460 )
461 .into()));
462 }
463 let child = field.children.first().expect_ok()?;
465 let child_encoder = self.do_create_field_encoder(
466 _encoding_strategy_root,
467 child,
468 column_index,
469 options,
470 root_field_metadata,
471 )?;
472 Ok(Box::new(FixedSizeListStructuralEncoder::new(
473 options.keep_original_array,
474 child_encoder,
475 )))
476 }
477 DataType::Map(_, keys_sorted) => {
478 if keys_sorted {
482 return Err(Error::not_supported_source(format!("Map data type is not supported with keys_sorted=true now, current value is {}", keys_sorted).into()));
483 }
484 if self.version < LanceFileVersion::V2_2 {
485 return Err(Error::not_supported_source(format!(
486 "Map data type is only supported in Lance file format 2.2+, current version: {}",
487 self.version
488 )
489 .into()));
490 }
491 let entries_child = field.children.first().ok_or_else(|| {
492 Error::schema("Map should have an entries child".to_string())
493 })?;
494 let DataType::Struct(struct_fields) = entries_child.data_type() else {
495 return Err(Error::schema(
496 "Map entries field must be a Struct<key, value>".to_string(),
497 ));
498 };
499 if struct_fields.len() < 2 {
500 return Err(Error::schema(
501 "Map entries struct must contain both key and value fields".to_string(),
502 ));
503 }
504 let key_field = &struct_fields[0];
505 if key_field.is_nullable() {
506 return Err(Error::schema(format!(
507 "Map key field '{}' must be non-nullable according to Arrow Map specification",
508 key_field.name()
509 )));
510 }
511 let child_encoder = self.do_create_field_encoder(
512 _encoding_strategy_root,
513 entries_child,
514 column_index,
515 options,
516 root_field_metadata,
517 )?;
518 Ok(Box::new(MapStructuralEncoder::new(
519 options.keep_original_array,
520 child_encoder,
521 )))
522 }
523 DataType::Struct(fields) => {
524 if field.is_packed_struct() || fields.is_empty() {
525 Ok(Box::new(PrimitiveStructuralEncoder::try_new(
527 options,
528 self.compression_strategy.clone(),
529 column_index.next_column_index(field.id as u32),
530 field.clone(),
531 Arc::new(root_field_metadata.clone()),
532 )?))
533 } else {
534 let children_encoders = field
535 .children
536 .iter()
537 .map(|field| {
538 self.do_create_field_encoder(
539 _encoding_strategy_root,
540 field,
541 column_index,
542 options,
543 root_field_metadata,
544 )
545 })
546 .collect::<Result<Vec<_>>>()?;
547 Ok(Box::new(StructStructuralEncoder::new(
548 options.keep_original_array,
549 children_encoders,
550 )))
551 }
552 }
553 DataType::Dictionary(_, value_type) => {
554 if Self::is_primitive_type(&value_type) {
556 Ok(Box::new(PrimitiveStructuralEncoder::try_new(
557 options,
558 self.compression_strategy.clone(),
559 column_index.next_column_index(field.id as u32),
560 field.clone(),
561 Arc::new(root_field_metadata.clone()),
562 )?))
563 } else {
564 Err(Error::not_supported_source(format!("cannot encode a dictionary column whose value type is a logical type ({})", value_type).into()))
570 }
571 }
572 _ => todo!("Implement encoding for field {}", field),
573 }
574 }
575 }
576}
577
578impl FieldEncodingStrategy for StructuralEncodingStrategy {
579 fn create_field_encoder(
580 &self,
581 encoding_strategy_root: &dyn FieldEncodingStrategy,
582 field: &Field,
583 column_index: &mut ColumnIndexSequence,
584 options: &EncodingOptions,
585 ) -> Result<Box<dyn FieldEncoder>> {
586 self.do_create_field_encoder(
587 encoding_strategy_root,
588 field,
589 column_index,
590 options,
591 &field.metadata,
592 )
593 }
594}
595
596pub struct BatchEncoder {
599 pub field_encoders: Vec<Box<dyn FieldEncoder>>,
600 pub field_id_to_column_index: Vec<(u32, u32)>,
601}
602
603impl BatchEncoder {
604 pub fn try_new(
605 schema: &Schema,
606 strategy: &dyn FieldEncodingStrategy,
607 options: &EncodingOptions,
608 ) -> Result<Self> {
609 let mut col_idx = 0;
610 let mut col_idx_sequence = ColumnIndexSequence::default();
611 let field_encoders = schema
612 .fields
613 .iter()
614 .map(|field| {
615 let encoder = strategy.create_field_encoder(
616 strategy,
617 field,
618 &mut col_idx_sequence,
619 options,
620 )?;
621 col_idx += encoder.as_ref().num_columns();
622 Ok(encoder)
623 })
624 .collect::<Result<Vec<_>>>()?;
625 Ok(Self {
626 field_encoders,
627 field_id_to_column_index: col_idx_sequence.mapping,
628 })
629 }
630
631 pub fn num_columns(&self) -> u32 {
632 self.field_encoders
633 .iter()
634 .map(|field_encoder| field_encoder.num_columns())
635 .sum::<u32>()
636 }
637}
638
639#[derive(Debug)]
643pub struct EncodedBatch {
644 pub data: Bytes,
645 pub page_table: Vec<Arc<ColumnInfo>>,
646 pub schema: Arc<Schema>,
647 pub top_level_columns: Vec<u32>,
648 pub num_rows: u64,
649}
650
651fn write_page_to_data_buffer(page: EncodedPage, data_buffer: &mut BytesMut) -> PageInfo {
652 let buffers = page.data;
653 let mut buffer_offsets_and_sizes = Vec::with_capacity(buffers.len());
654 for buffer in buffers {
655 let buffer_offset = data_buffer.len() as u64;
656 data_buffer.extend_from_slice(&buffer);
657 let size = data_buffer.len() as u64 - buffer_offset;
658 buffer_offsets_and_sizes.push((buffer_offset, size));
659 }
660
661 PageInfo {
662 buffer_offsets_and_sizes: Arc::from(buffer_offsets_and_sizes),
663 encoding: page.description,
664 num_rows: page.num_rows,
665 priority: page.row_number,
666 }
667}
668
669pub async fn encode_batch(
674 batch: &RecordBatch,
675 schema: Arc<Schema>,
676 encoding_strategy: &dyn FieldEncodingStrategy,
677 options: &EncodingOptions,
678) -> Result<EncodedBatch> {
679 if !is_pwr_two(options.buffer_alignment) || options.buffer_alignment < MIN_PAGE_BUFFER_ALIGNMENT
680 {
681 return Err(Error::invalid_input_source(
682 format!(
683 "buffer_alignment must be a power of two and at least {}",
684 MIN_PAGE_BUFFER_ALIGNMENT
685 )
686 .into(),
687 ));
688 }
689
690 let mut data_buffer = BytesMut::new();
691 let lance_schema = Schema::try_from(batch.schema().as_ref())?;
692 let options = EncodingOptions {
693 keep_original_array: true,
694 ..*options
695 };
696 let batch_encoder = BatchEncoder::try_new(&lance_schema, encoding_strategy, &options)?;
697 let mut page_table = Vec::new();
698 let mut col_idx_offset = 0;
699 for (arr, mut encoder) in batch.columns().iter().zip(batch_encoder.field_encoders) {
700 let mut external_buffers =
701 OutOfLineBuffers::new(data_buffer.len() as u64, options.buffer_alignment);
702 let repdef = RepDefBuilder::default();
703 let encoder = encoder.as_mut();
704 let num_rows = arr.len() as u64;
705 let mut tasks =
706 encoder.maybe_encode(arr.clone(), &mut external_buffers, repdef, 0, num_rows)?;
707 tasks.extend(encoder.flush(&mut external_buffers)?);
708 for buffer in external_buffers.take_buffers() {
709 data_buffer.extend_from_slice(&buffer);
710 }
711 let mut pages = HashMap::<u32, Vec<PageInfo>>::new();
712 for task in tasks {
713 let encoded_page = task.await?;
714 pages
716 .entry(encoded_page.column_idx)
717 .or_default()
718 .push(write_page_to_data_buffer(encoded_page, &mut data_buffer));
719 }
720 let mut external_buffers =
721 OutOfLineBuffers::new(data_buffer.len() as u64, options.buffer_alignment);
722 let encoded_columns = encoder.finish(&mut external_buffers).await?;
723 for buffer in external_buffers.take_buffers() {
724 data_buffer.extend_from_slice(&buffer);
725 }
726 let num_columns = encoded_columns.len();
727 for (col_idx, encoded_column) in encoded_columns.into_iter().enumerate() {
728 let col_idx = col_idx + col_idx_offset;
729 let mut col_buffer_offsets_and_sizes = Vec::new();
730 for buffer in encoded_column.column_buffers {
731 let buffer_offset = data_buffer.len() as u64;
732 data_buffer.extend_from_slice(&buffer);
733 let size = data_buffer.len() as u64 - buffer_offset;
734 col_buffer_offsets_and_sizes.push((buffer_offset, size));
735 }
736 for page in encoded_column.final_pages {
737 pages
738 .entry(page.column_idx)
739 .or_default()
740 .push(write_page_to_data_buffer(page, &mut data_buffer));
741 }
742 let col_pages = std::mem::take(pages.entry(col_idx as u32).or_default());
743 page_table.push(Arc::new(ColumnInfo {
744 index: col_idx as u32,
745 buffer_offsets_and_sizes: Arc::from(
746 col_buffer_offsets_and_sizes.into_boxed_slice(),
747 ),
748 page_infos: Arc::from(col_pages.into_boxed_slice()),
749 encoding: encoded_column.encoding,
750 }))
751 }
752 col_idx_offset += num_columns;
753 }
754 let top_level_columns = batch_encoder
755 .field_id_to_column_index
756 .iter()
757 .map(|(_, idx)| *idx)
758 .collect();
759 Ok(EncodedBatch {
760 data: data_buffer.freeze(),
761 top_level_columns,
762 page_table,
763 schema,
764 num_rows: batch.num_rows() as u64,
765 })
766}
767
768#[cfg(test)]
769mod tests {
770 use super::*;
771 use crate::compression_config::{CompressionFieldParams, CompressionParams};
772 use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, Fields as ArrowFields};
773
774 #[test]
775 fn test_configured_encoding_strategy() {
776 let mut params = CompressionParams::new();
778 params.columns.insert(
779 "*_id".to_string(),
780 CompressionFieldParams {
781 rle_threshold: Some(0.5),
782 compression: Some("lz4".to_string()),
783 compression_level: None,
784 bss: None,
785 minichunk_size: None,
786 },
787 );
788
789 let strategy =
791 default_encoding_strategy_with_params(LanceFileVersion::V2_1, params.clone())
792 .expect("Should succeed for V2.1");
793
794 assert!(format!("{:?}", strategy).contains("StructuralEncodingStrategy"));
796 assert!(format!("{:?}", strategy).contains("DefaultCompressionStrategy"));
797
798 let err = default_encoding_strategy_with_params(LanceFileVersion::V2_0, params.clone())
800 .expect_err("Should fail for V2.0");
801 assert!(
802 err.to_string()
803 .contains("only supported in Lance file version 2.1")
804 );
805
806 let err = default_encoding_strategy_with_params(LanceFileVersion::Legacy, params)
808 .expect_err("Should fail for Legacy");
809 assert!(
810 err.to_string()
811 .contains("only supported in Lance file version 2.1")
812 );
813 }
814
815 #[test]
816 fn test_fixed_size_list_struct_requires_v2_2() {
817 let list_item = ArrowField::new(
818 "item",
819 ArrowDataType::Struct(ArrowFields::from(vec![ArrowField::new(
820 "x",
821 ArrowDataType::Int32,
822 true,
823 )])),
824 true,
825 );
826 let arrow_field = ArrowField::new(
827 "list_struct",
828 ArrowDataType::FixedSizeList(Arc::new(list_item), 2),
829 true,
830 );
831 let field = Field::try_from(&arrow_field).unwrap();
832
833 let strategy = StructuralEncodingStrategy::with_version(LanceFileVersion::V2_1);
834 let mut column_index = ColumnIndexSequence::default();
835 let options = EncodingOptions::default();
836
837 let result = strategy.create_field_encoder(&strategy, &field, &mut column_index, &options);
838 assert!(
839 result.is_err(),
840 "FixedSizeList<Struct> should be rejected for file version 2.1"
841 );
842 let err = result.err().unwrap();
843
844 assert!(
845 err.to_string()
846 .contains("FixedSizeList<Struct> is only supported in Lance file format 2.2+")
847 );
848 }
849}