1use std::{collections::HashMap, sync::Arc};
17
18use arrow_array::{Array, ArrayRef, RecordBatch};
19use arrow_schema::DataType;
20use bytes::{Bytes, BytesMut};
21use futures::future::BoxFuture;
22use lance_core::datatypes::{Field, Schema};
23use lance_core::error::LanceOptionExt;
24use lance_core::utils::bit::{is_pwr_two, pad_bytes_to};
25use lance_core::{Error, Result};
26use snafu::location;
27
28use crate::buffer::LanceBuffer;
29use crate::compression::{CompressionStrategy, DefaultCompressionStrategy};
30use crate::compression_config::CompressionParams;
31use crate::decoder::PageEncoding;
32use crate::encodings::logical::blob::{BlobStructuralEncoder, BlobV2StructuralEncoder};
33use crate::encodings::logical::fixed_size_list::FixedSizeListStructuralEncoder;
34use crate::encodings::logical::list::ListStructuralEncoder;
35use crate::encodings::logical::map::MapStructuralEncoder;
36use crate::encodings::logical::primitive::PrimitiveStructuralEncoder;
37use crate::encodings::logical::r#struct::StructStructuralEncoder;
38use crate::repdef::RepDefBuilder;
39use crate::version::LanceFileVersion;
40use crate::{
41 decoder::{ColumnInfo, PageInfo},
42 format::pb,
43};
44
45pub const MIN_PAGE_BUFFER_ALIGNMENT: u64 = 8;
47
48#[derive(Debug)]
54pub struct EncodedPage {
55 pub data: Vec<LanceBuffer>,
57 pub description: PageEncoding,
59 pub num_rows: u64,
61 pub row_number: u64,
68 pub column_idx: u32,
70}
71
72pub struct EncodedColumn {
73 pub column_buffers: Vec<LanceBuffer>,
74 pub encoding: pb::ColumnEncoding,
75 pub final_pages: Vec<EncodedPage>,
76}
77
78impl Default for EncodedColumn {
79 fn default() -> Self {
80 Self {
81 column_buffers: Default::default(),
82 encoding: pb::ColumnEncoding {
83 column_encoding: Some(pb::column_encoding::ColumnEncoding::Values(())),
84 },
85 final_pages: Default::default(),
86 }
87 }
88}
89
90pub struct OutOfLineBuffers {
104 position: u64,
105 buffer_alignment: u64,
106 buffers: Vec<LanceBuffer>,
107}
108
109impl OutOfLineBuffers {
110 pub fn new(base_position: u64, buffer_alignment: u64) -> Self {
111 Self {
112 position: base_position,
113 buffer_alignment,
114 buffers: Vec::new(),
115 }
116 }
117
118 pub fn add_buffer(&mut self, buffer: LanceBuffer) -> u64 {
119 let position = self.position;
120 self.position += buffer.len() as u64;
121 self.position += pad_bytes_to(buffer.len(), self.buffer_alignment as usize) as u64;
122 self.buffers.push(buffer);
123 position
124 }
125
126 pub fn take_buffers(self) -> Vec<LanceBuffer> {
127 self.buffers
128 }
129
130 pub fn reset_position(&mut self, position: u64) {
131 self.position = position;
132 }
133}
134
135pub type EncodeTask = BoxFuture<'static, Result<EncodedPage>>;
137
138pub trait FieldEncoder: Send {
149 fn maybe_encode(
169 &mut self,
170 array: ArrayRef,
171 external_buffers: &mut OutOfLineBuffers,
172 repdef: RepDefBuilder,
173 row_number: u64,
174 num_rows: u64,
175 ) -> Result<Vec<EncodeTask>>;
176 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>>;
185 fn finish(
191 &mut self,
192 external_buffers: &mut OutOfLineBuffers,
193 ) -> BoxFuture<'_, Result<Vec<EncodedColumn>>>;
194
195 fn num_columns(&self) -> u32;
197}
198
199#[derive(Debug, Default)]
202pub struct ColumnIndexSequence {
203 current_index: u32,
204 mapping: Vec<(u32, u32)>,
205}
206
207impl ColumnIndexSequence {
208 pub fn next_column_index(&mut self, field_id: u32) -> u32 {
209 let idx = self.current_index;
210 self.current_index += 1;
211 self.mapping.push((field_id, idx));
212 idx
213 }
214
215 pub fn skip(&mut self) {
216 self.current_index += 1;
217 }
218}
219
220pub struct EncodingOptions {
222 pub cache_bytes_per_column: u64,
226 pub max_page_bytes: u64,
229 pub keep_original_array: bool,
234 pub buffer_alignment: u64,
239
240 pub version: LanceFileVersion,
242}
243
244impl Default for EncodingOptions {
245 fn default() -> Self {
246 Self {
247 cache_bytes_per_column: 8 * 1024 * 1024,
248 max_page_bytes: 32 * 1024 * 1024,
249 keep_original_array: true,
250 buffer_alignment: 64,
251 version: LanceFileVersion::default(),
252 }
253 }
254}
255
256impl EncodingOptions {
257 pub fn support_large_chunk(&self) -> bool {
261 self.version >= LanceFileVersion::V2_2
262 }
263}
264
265pub trait FieldEncodingStrategy: Send + Sync + std::fmt::Debug {
271 fn create_field_encoder(
283 &self,
284 encoding_strategy_root: &dyn FieldEncodingStrategy,
285 field: &Field,
286 column_index: &mut ColumnIndexSequence,
287 options: &EncodingOptions,
288 ) -> Result<Box<dyn FieldEncoder>>;
289}
290
291pub fn default_encoding_strategy(version: LanceFileVersion) -> Box<dyn FieldEncodingStrategy> {
292 match version.resolve() {
293 LanceFileVersion::Legacy => panic!(),
294 LanceFileVersion::V2_0 => Box::new(
295 crate::previous::encoder::CoreFieldEncodingStrategy::new(version),
296 ),
297 _ => Box::new(StructuralEncodingStrategy::with_version(version)),
298 }
299}
300
301pub fn default_encoding_strategy_with_params(
303 version: LanceFileVersion,
304 params: CompressionParams,
305) -> Result<Box<dyn FieldEncodingStrategy>> {
306 match version.resolve() {
307 LanceFileVersion::Legacy | LanceFileVersion::V2_0 => Err(Error::invalid_input(
308 "Compression parameters are only supported in Lance file version 2.1 and later",
309 location!(),
310 )),
311 _ => {
312 let compression_strategy =
313 Arc::new(DefaultCompressionStrategy::with_params(params).with_version(version));
314 Ok(Box::new(StructuralEncodingStrategy {
315 compression_strategy,
316 version,
317 }))
318 }
319 }
320}
321
322#[derive(Debug)]
324pub struct StructuralEncodingStrategy {
325 pub compression_strategy: Arc<dyn CompressionStrategy>,
326 pub version: LanceFileVersion,
327}
328
329#[allow(clippy::derivable_impls)]
332impl Default for StructuralEncodingStrategy {
333 fn default() -> Self {
334 Self {
335 compression_strategy: Arc::new(DefaultCompressionStrategy::new()),
336 version: LanceFileVersion::default(),
337 }
338 }
339}
340
341impl StructuralEncodingStrategy {
342 pub fn with_version(version: LanceFileVersion) -> Self {
343 Self {
344 compression_strategy: Arc::new(DefaultCompressionStrategy::new().with_version(version)),
345 version,
346 }
347 }
348
349 fn is_primitive_type(data_type: &DataType) -> bool {
350 match data_type {
351 DataType::FixedSizeList(inner, _) => Self::is_primitive_type(inner.data_type()),
352 _ => matches!(
353 data_type,
354 DataType::Boolean
355 | DataType::Date32
356 | DataType::Date64
357 | DataType::Decimal128(_, _)
358 | DataType::Decimal256(_, _)
359 | DataType::Duration(_)
360 | DataType::Float16
361 | DataType::Float32
362 | DataType::Float64
363 | DataType::Int16
364 | DataType::Int32
365 | DataType::Int64
366 | DataType::Int8
367 | DataType::Interval(_)
368 | DataType::Null
369 | DataType::Time32(_)
370 | DataType::Time64(_)
371 | DataType::Timestamp(_, _)
372 | DataType::UInt16
373 | DataType::UInt32
374 | DataType::UInt64
375 | DataType::UInt8
376 | DataType::FixedSizeBinary(_)
377 | DataType::Binary
378 | DataType::LargeBinary
379 | DataType::Utf8
380 | DataType::LargeUtf8,
381 ),
382 }
383 }
384
385 fn do_create_field_encoder(
386 &self,
387 _encoding_strategy_root: &dyn FieldEncodingStrategy,
388 field: &Field,
389 column_index: &mut ColumnIndexSequence,
390 options: &EncodingOptions,
391 root_field_metadata: &HashMap<String, String>,
392 ) -> Result<Box<dyn FieldEncoder>> {
393 let data_type = field.data_type();
394
395 if field.is_blob() {
397 match data_type {
398 DataType::Binary | DataType::LargeBinary => {
399 return Ok(Box::new(BlobStructuralEncoder::new(
400 field,
401 column_index.next_column_index(field.id as u32),
402 options,
403 self.compression_strategy.clone(),
404 )?));
405 }
406 DataType::Struct(_) if self.version >= LanceFileVersion::V2_2 => {
407 return Ok(Box::new(BlobV2StructuralEncoder::new(
408 field,
409 column_index.next_column_index(field.id as u32),
410 options,
411 self.compression_strategy.clone(),
412 )?));
413 }
414 DataType::Struct(_) => {
415 return Err(Error::InvalidInput {
416 source: "Blob v2 struct input requires file version >= 2.2".into(),
417 location: location!(),
418 });
419 }
420 _ => {
421 return Err(Error::InvalidInput {
422 source: format!(
423 "Blob encoding only supports Binary/LargeBinary or v2 Struct, got {}",
424 data_type
425 )
426 .into(),
427 location: location!(),
428 });
429 }
430 }
431 }
432
433 if Self::is_primitive_type(&data_type) {
434 Ok(Box::new(PrimitiveStructuralEncoder::try_new(
435 options,
436 self.compression_strategy.clone(),
437 column_index.next_column_index(field.id as u32),
438 field.clone(),
439 Arc::new(root_field_metadata.clone()),
440 )?))
441 } else {
442 match data_type {
443 DataType::List(_) | DataType::LargeList(_) => {
444 let child = field.children.first().expect_ok()?;
445 let child_encoder = self.do_create_field_encoder(
446 _encoding_strategy_root,
447 child,
448 column_index,
449 options,
450 root_field_metadata,
451 )?;
452 Ok(Box::new(ListStructuralEncoder::new(
453 options.keep_original_array,
454 child_encoder,
455 )))
456 }
457 DataType::FixedSizeList(inner, _)
458 if matches!(inner.data_type(), DataType::Struct(_)) =>
459 {
460 if self.version < LanceFileVersion::V2_2 {
461 return Err(Error::NotSupported {
462 source: format!(
463 "FixedSizeList<Struct> is only supported in Lance file format 2.2+, current version: {}",
464 self.version
465 )
466 .into(),
467 location: location!(),
468 });
469 }
470 let child = field.children.first().expect_ok()?;
472 let child_encoder = self.do_create_field_encoder(
473 _encoding_strategy_root,
474 child,
475 column_index,
476 options,
477 root_field_metadata,
478 )?;
479 Ok(Box::new(FixedSizeListStructuralEncoder::new(
480 options.keep_original_array,
481 child_encoder,
482 )))
483 }
484 DataType::Map(_, keys_sorted) => {
485 if keys_sorted {
489 return Err(Error::NotSupported {
490 source: format!("Map data type is not supported with keys_sorted=true now, current value is {}", keys_sorted).into(),
491 location: location!(),
492 });
493 }
494 if self.version < LanceFileVersion::V2_2 {
495 return Err(Error::NotSupported {
496 source: format!(
497 "Map data type is only supported in Lance file format 2.2+, current version: {}",
498 self.version
499 )
500 .into(),
501 location: location!(),
502 });
503 }
504 let entries_child = field.children.first().ok_or_else(|| Error::Schema {
505 message: "Map should have an entries child".to_string(),
506 location: location!(),
507 })?;
508 let DataType::Struct(struct_fields) = entries_child.data_type() else {
509 return Err(Error::Schema {
510 message: "Map entries field must be a Struct<key, value>".to_string(),
511 location: location!(),
512 });
513 };
514 if struct_fields.len() < 2 {
515 return Err(Error::Schema {
516 message: "Map entries struct must contain both key and value fields"
517 .to_string(),
518 location: location!(),
519 });
520 }
521 let key_field = &struct_fields[0];
522 if key_field.is_nullable() {
523 return Err(Error::Schema {
524 message: format!(
525 "Map key field '{}' must be non-nullable according to Arrow Map specification",
526 key_field.name()
527 ),
528 location: location!(),
529 });
530 }
531 let child_encoder = self.do_create_field_encoder(
532 _encoding_strategy_root,
533 entries_child,
534 column_index,
535 options,
536 root_field_metadata,
537 )?;
538 Ok(Box::new(MapStructuralEncoder::new(
539 options.keep_original_array,
540 child_encoder,
541 )))
542 }
543 DataType::Struct(fields) => {
544 if field.is_packed_struct() || fields.is_empty() {
545 Ok(Box::new(PrimitiveStructuralEncoder::try_new(
547 options,
548 self.compression_strategy.clone(),
549 column_index.next_column_index(field.id as u32),
550 field.clone(),
551 Arc::new(root_field_metadata.clone()),
552 )?))
553 } else {
554 let children_encoders = field
555 .children
556 .iter()
557 .map(|field| {
558 self.do_create_field_encoder(
559 _encoding_strategy_root,
560 field,
561 column_index,
562 options,
563 root_field_metadata,
564 )
565 })
566 .collect::<Result<Vec<_>>>()?;
567 Ok(Box::new(StructStructuralEncoder::new(
568 options.keep_original_array,
569 children_encoders,
570 )))
571 }
572 }
573 DataType::Dictionary(_, value_type) => {
574 if Self::is_primitive_type(&value_type) {
576 Ok(Box::new(PrimitiveStructuralEncoder::try_new(
577 options,
578 self.compression_strategy.clone(),
579 column_index.next_column_index(field.id as u32),
580 field.clone(),
581 Arc::new(root_field_metadata.clone()),
582 )?))
583 } else {
584 Err(Error::NotSupported { source: format!("cannot encode a dictionary column whose value type is a logical type ({})", value_type).into(), location: location!() })
590 }
591 }
592 _ => todo!("Implement encoding for field {}", field),
593 }
594 }
595 }
596}
597
598impl FieldEncodingStrategy for StructuralEncodingStrategy {
599 fn create_field_encoder(
600 &self,
601 encoding_strategy_root: &dyn FieldEncodingStrategy,
602 field: &Field,
603 column_index: &mut ColumnIndexSequence,
604 options: &EncodingOptions,
605 ) -> Result<Box<dyn FieldEncoder>> {
606 self.do_create_field_encoder(
607 encoding_strategy_root,
608 field,
609 column_index,
610 options,
611 &field.metadata,
612 )
613 }
614}
615
616pub struct BatchEncoder {
619 pub field_encoders: Vec<Box<dyn FieldEncoder>>,
620 pub field_id_to_column_index: Vec<(u32, u32)>,
621}
622
623impl BatchEncoder {
624 pub fn try_new(
625 schema: &Schema,
626 strategy: &dyn FieldEncodingStrategy,
627 options: &EncodingOptions,
628 ) -> Result<Self> {
629 let mut col_idx = 0;
630 let mut col_idx_sequence = ColumnIndexSequence::default();
631 let field_encoders = schema
632 .fields
633 .iter()
634 .map(|field| {
635 let encoder = strategy.create_field_encoder(
636 strategy,
637 field,
638 &mut col_idx_sequence,
639 options,
640 )?;
641 col_idx += encoder.as_ref().num_columns();
642 Ok(encoder)
643 })
644 .collect::<Result<Vec<_>>>()?;
645 Ok(Self {
646 field_encoders,
647 field_id_to_column_index: col_idx_sequence.mapping,
648 })
649 }
650
651 pub fn num_columns(&self) -> u32 {
652 self.field_encoders
653 .iter()
654 .map(|field_encoder| field_encoder.num_columns())
655 .sum::<u32>()
656 }
657}
658
659#[derive(Debug)]
663pub struct EncodedBatch {
664 pub data: Bytes,
665 pub page_table: Vec<Arc<ColumnInfo>>,
666 pub schema: Arc<Schema>,
667 pub top_level_columns: Vec<u32>,
668 pub num_rows: u64,
669}
670
671fn write_page_to_data_buffer(page: EncodedPage, data_buffer: &mut BytesMut) -> PageInfo {
672 let buffers = page.data;
673 let mut buffer_offsets_and_sizes = Vec::with_capacity(buffers.len());
674 for buffer in buffers {
675 let buffer_offset = data_buffer.len() as u64;
676 data_buffer.extend_from_slice(&buffer);
677 let size = data_buffer.len() as u64 - buffer_offset;
678 buffer_offsets_and_sizes.push((buffer_offset, size));
679 }
680
681 PageInfo {
682 buffer_offsets_and_sizes: Arc::from(buffer_offsets_and_sizes),
683 encoding: page.description,
684 num_rows: page.num_rows,
685 priority: page.row_number,
686 }
687}
688
689pub async fn encode_batch(
694 batch: &RecordBatch,
695 schema: Arc<Schema>,
696 encoding_strategy: &dyn FieldEncodingStrategy,
697 options: &EncodingOptions,
698) -> Result<EncodedBatch> {
699 if !is_pwr_two(options.buffer_alignment) || options.buffer_alignment < MIN_PAGE_BUFFER_ALIGNMENT
700 {
701 return Err(Error::InvalidInput {
702 source: format!(
703 "buffer_alignment must be a power of two and at least {}",
704 MIN_PAGE_BUFFER_ALIGNMENT
705 )
706 .into(),
707 location: location!(),
708 });
709 }
710
711 let mut data_buffer = BytesMut::new();
712 let lance_schema = Schema::try_from(batch.schema().as_ref())?;
713 let options = EncodingOptions {
714 keep_original_array: true,
715 ..*options
716 };
717 let batch_encoder = BatchEncoder::try_new(&lance_schema, encoding_strategy, &options)?;
718 let mut page_table = Vec::new();
719 let mut col_idx_offset = 0;
720 for (arr, mut encoder) in batch.columns().iter().zip(batch_encoder.field_encoders) {
721 let mut external_buffers =
722 OutOfLineBuffers::new(data_buffer.len() as u64, options.buffer_alignment);
723 let repdef = RepDefBuilder::default();
724 let encoder = encoder.as_mut();
725 let num_rows = arr.len() as u64;
726 let mut tasks =
727 encoder.maybe_encode(arr.clone(), &mut external_buffers, repdef, 0, num_rows)?;
728 tasks.extend(encoder.flush(&mut external_buffers)?);
729 for buffer in external_buffers.take_buffers() {
730 data_buffer.extend_from_slice(&buffer);
731 }
732 let mut pages = HashMap::<u32, Vec<PageInfo>>::new();
733 for task in tasks {
734 let encoded_page = task.await?;
735 pages
737 .entry(encoded_page.column_idx)
738 .or_default()
739 .push(write_page_to_data_buffer(encoded_page, &mut data_buffer));
740 }
741 let mut external_buffers =
742 OutOfLineBuffers::new(data_buffer.len() as u64, options.buffer_alignment);
743 let encoded_columns = encoder.finish(&mut external_buffers).await?;
744 for buffer in external_buffers.take_buffers() {
745 data_buffer.extend_from_slice(&buffer);
746 }
747 let num_columns = encoded_columns.len();
748 for (col_idx, encoded_column) in encoded_columns.into_iter().enumerate() {
749 let col_idx = col_idx + col_idx_offset;
750 let mut col_buffer_offsets_and_sizes = Vec::new();
751 for buffer in encoded_column.column_buffers {
752 let buffer_offset = data_buffer.len() as u64;
753 data_buffer.extend_from_slice(&buffer);
754 let size = data_buffer.len() as u64 - buffer_offset;
755 col_buffer_offsets_and_sizes.push((buffer_offset, size));
756 }
757 for page in encoded_column.final_pages {
758 pages
759 .entry(page.column_idx)
760 .or_default()
761 .push(write_page_to_data_buffer(page, &mut data_buffer));
762 }
763 let col_pages = std::mem::take(pages.entry(col_idx as u32).or_default());
764 page_table.push(Arc::new(ColumnInfo {
765 index: col_idx as u32,
766 buffer_offsets_and_sizes: Arc::from(
767 col_buffer_offsets_and_sizes.into_boxed_slice(),
768 ),
769 page_infos: Arc::from(col_pages.into_boxed_slice()),
770 encoding: encoded_column.encoding,
771 }))
772 }
773 col_idx_offset += num_columns;
774 }
775 let top_level_columns = batch_encoder
776 .field_id_to_column_index
777 .iter()
778 .map(|(_, idx)| *idx)
779 .collect();
780 Ok(EncodedBatch {
781 data: data_buffer.freeze(),
782 top_level_columns,
783 page_table,
784 schema,
785 num_rows: batch.num_rows() as u64,
786 })
787}
788
789#[cfg(test)]
790mod tests {
791 use super::*;
792 use crate::compression_config::{CompressionFieldParams, CompressionParams};
793
794 #[test]
795 fn test_configured_encoding_strategy() {
796 let mut params = CompressionParams::new();
798 params.columns.insert(
799 "*_id".to_string(),
800 CompressionFieldParams {
801 rle_threshold: Some(0.5),
802 compression: Some("lz4".to_string()),
803 compression_level: None,
804 bss: None,
805 minichunk_size: None,
806 },
807 );
808
809 let strategy =
811 default_encoding_strategy_with_params(LanceFileVersion::V2_1, params.clone())
812 .expect("Should succeed for V2.1");
813
814 assert!(format!("{:?}", strategy).contains("StructuralEncodingStrategy"));
816 assert!(format!("{:?}", strategy).contains("DefaultCompressionStrategy"));
817
818 let err = default_encoding_strategy_with_params(LanceFileVersion::V2_0, params.clone())
820 .expect_err("Should fail for V2.0");
821 assert!(err
822 .to_string()
823 .contains("only supported in Lance file version 2.1"));
824
825 let err = default_encoding_strategy_with_params(LanceFileVersion::Legacy, params)
827 .expect_err("Should fail for Legacy");
828 assert!(err
829 .to_string()
830 .contains("only supported in Lance file version 2.1"));
831 }
832}