1use std::{collections::HashMap, sync::Arc};
17
18use arrow_array::{Array, ArrayRef, RecordBatch};
19use arrow_schema::DataType;
20use bytes::{Bytes, BytesMut};
21use futures::future::BoxFuture;
22use lance_core::datatypes::{Field, Schema};
23use lance_core::utils::bit::{is_pwr_two, pad_bytes_to};
24use lance_core::{Error, Result};
25use snafu::location;
26
27use crate::buffer::LanceBuffer;
28use crate::compression::{CompressionStrategy, DefaultCompressionStrategy};
29use crate::compression_config::CompressionParams;
30use crate::decoder::PageEncoding;
31use crate::encodings::logical::blob::{BlobStructuralEncoder, BlobV2StructuralEncoder};
32use crate::encodings::logical::list::ListStructuralEncoder;
33use crate::encodings::logical::primitive::PrimitiveStructuralEncoder;
34use crate::encodings::logical::r#struct::StructStructuralEncoder;
35use crate::repdef::RepDefBuilder;
36use crate::version::LanceFileVersion;
37use crate::{
38 decoder::{ColumnInfo, PageInfo},
39 format::pb,
40};
41
42pub const MIN_PAGE_BUFFER_ALIGNMENT: u64 = 8;
44
45#[derive(Debug)]
51pub struct EncodedPage {
52 pub data: Vec<LanceBuffer>,
54 pub description: PageEncoding,
56 pub num_rows: u64,
58 pub row_number: u64,
65 pub column_idx: u32,
67}
68
69pub struct EncodedColumn {
70 pub column_buffers: Vec<LanceBuffer>,
71 pub encoding: pb::ColumnEncoding,
72 pub final_pages: Vec<EncodedPage>,
73}
74
75impl Default for EncodedColumn {
76 fn default() -> Self {
77 Self {
78 column_buffers: Default::default(),
79 encoding: pb::ColumnEncoding {
80 column_encoding: Some(pb::column_encoding::ColumnEncoding::Values(())),
81 },
82 final_pages: Default::default(),
83 }
84 }
85}
86
87pub struct OutOfLineBuffers {
101 position: u64,
102 buffer_alignment: u64,
103 buffers: Vec<LanceBuffer>,
104}
105
106impl OutOfLineBuffers {
107 pub fn new(base_position: u64, buffer_alignment: u64) -> Self {
108 Self {
109 position: base_position,
110 buffer_alignment,
111 buffers: Vec::new(),
112 }
113 }
114
115 pub fn add_buffer(&mut self, buffer: LanceBuffer) -> u64 {
116 let position = self.position;
117 self.position += buffer.len() as u64;
118 self.position += pad_bytes_to(buffer.len(), self.buffer_alignment as usize) as u64;
119 self.buffers.push(buffer);
120 position
121 }
122
123 pub fn take_buffers(self) -> Vec<LanceBuffer> {
124 self.buffers
125 }
126
127 pub fn reset_position(&mut self, position: u64) {
128 self.position = position;
129 }
130}
131
132pub type EncodeTask = BoxFuture<'static, Result<EncodedPage>>;
134
135pub trait FieldEncoder: Send {
146 fn maybe_encode(
166 &mut self,
167 array: ArrayRef,
168 external_buffers: &mut OutOfLineBuffers,
169 repdef: RepDefBuilder,
170 row_number: u64,
171 num_rows: u64,
172 ) -> Result<Vec<EncodeTask>>;
173 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>>;
182 fn finish(
188 &mut self,
189 external_buffers: &mut OutOfLineBuffers,
190 ) -> BoxFuture<'_, Result<Vec<EncodedColumn>>>;
191
192 fn num_columns(&self) -> u32;
194}
195
196#[derive(Debug, Default)]
199pub struct ColumnIndexSequence {
200 current_index: u32,
201 mapping: Vec<(u32, u32)>,
202}
203
204impl ColumnIndexSequence {
205 pub fn next_column_index(&mut self, field_id: u32) -> u32 {
206 let idx = self.current_index;
207 self.current_index += 1;
208 self.mapping.push((field_id, idx));
209 idx
210 }
211
212 pub fn skip(&mut self) {
213 self.current_index += 1;
214 }
215}
216
217pub struct EncodingOptions {
219 pub cache_bytes_per_column: u64,
223 pub max_page_bytes: u64,
226 pub keep_original_array: bool,
231 pub buffer_alignment: u64,
236}
237
238impl Default for EncodingOptions {
239 fn default() -> Self {
240 Self {
241 cache_bytes_per_column: 8 * 1024 * 1024,
242 max_page_bytes: 32 * 1024 * 1024,
243 keep_original_array: true,
244 buffer_alignment: 64,
245 }
246 }
247}
248
249pub trait FieldEncodingStrategy: Send + Sync + std::fmt::Debug {
255 fn create_field_encoder(
267 &self,
268 encoding_strategy_root: &dyn FieldEncodingStrategy,
269 field: &Field,
270 column_index: &mut ColumnIndexSequence,
271 options: &EncodingOptions,
272 ) -> Result<Box<dyn FieldEncoder>>;
273}
274
275pub fn default_encoding_strategy(version: LanceFileVersion) -> Box<dyn FieldEncodingStrategy> {
276 match version.resolve() {
277 LanceFileVersion::Legacy => panic!(),
278 LanceFileVersion::V2_0 => Box::new(
279 crate::previous::encoder::CoreFieldEncodingStrategy::new(version),
280 ),
281 _ => Box::new(StructuralEncodingStrategy::with_version(version)),
282 }
283}
284
285pub fn default_encoding_strategy_with_params(
287 version: LanceFileVersion,
288 params: CompressionParams,
289) -> Result<Box<dyn FieldEncodingStrategy>> {
290 match version.resolve() {
291 LanceFileVersion::Legacy | LanceFileVersion::V2_0 => Err(Error::invalid_input(
292 "Compression parameters are only supported in Lance file version 2.1 and later",
293 location!(),
294 )),
295 _ => {
296 let compression_strategy =
297 Arc::new(DefaultCompressionStrategy::with_params(params).with_version(version));
298 Ok(Box::new(StructuralEncodingStrategy {
299 compression_strategy,
300 version,
301 }))
302 }
303 }
304}
305
306#[derive(Debug)]
308pub struct StructuralEncodingStrategy {
309 pub compression_strategy: Arc<dyn CompressionStrategy>,
310 pub version: LanceFileVersion,
311}
312
313#[allow(clippy::derivable_impls)]
316impl Default for StructuralEncodingStrategy {
317 fn default() -> Self {
318 Self {
319 compression_strategy: Arc::new(DefaultCompressionStrategy::new()),
320 version: LanceFileVersion::default(),
321 }
322 }
323}
324
325impl StructuralEncodingStrategy {
326 pub fn with_version(version: LanceFileVersion) -> Self {
327 Self {
328 compression_strategy: Arc::new(DefaultCompressionStrategy::new().with_version(version)),
329 version,
330 }
331 }
332
333 fn is_primitive_type(data_type: &DataType) -> bool {
334 matches!(
335 data_type,
336 DataType::Boolean
337 | DataType::Date32
338 | DataType::Date64
339 | DataType::Decimal128(_, _)
340 | DataType::Decimal256(_, _)
341 | DataType::Duration(_)
342 | DataType::Float16
343 | DataType::Float32
344 | DataType::Float64
345 | DataType::Int16
346 | DataType::Int32
347 | DataType::Int64
348 | DataType::Int8
349 | DataType::Interval(_)
350 | DataType::Null
351 | DataType::Time32(_)
352 | DataType::Time64(_)
353 | DataType::Timestamp(_, _)
354 | DataType::UInt16
355 | DataType::UInt32
356 | DataType::UInt64
357 | DataType::UInt8
358 | DataType::FixedSizeBinary(_)
359 | DataType::FixedSizeList(_, _)
360 | DataType::Binary
361 | DataType::LargeBinary
362 | DataType::Utf8
363 | DataType::LargeUtf8,
364 )
365 }
366
367 fn do_create_field_encoder(
368 &self,
369 _encoding_strategy_root: &dyn FieldEncodingStrategy,
370 field: &Field,
371 column_index: &mut ColumnIndexSequence,
372 options: &EncodingOptions,
373 root_field_metadata: &HashMap<String, String>,
374 ) -> Result<Box<dyn FieldEncoder>> {
375 let data_type = field.data_type();
376
377 if field.is_blob() {
379 match data_type {
380 DataType::Binary | DataType::LargeBinary => {
381 return Ok(Box::new(BlobStructuralEncoder::new(
382 field,
383 column_index.next_column_index(field.id as u32),
384 options,
385 self.compression_strategy.clone(),
386 )?));
387 }
388 DataType::Struct(_) if self.version >= LanceFileVersion::V2_2 => {
389 return Ok(Box::new(BlobV2StructuralEncoder::new(
390 field,
391 column_index.next_column_index(field.id as u32),
392 options,
393 self.compression_strategy.clone(),
394 )?));
395 }
396 DataType::Struct(_) => {
397 return Err(Error::InvalidInput {
398 source: "Blob v2 struct input requires file version >= 2.2".into(),
399 location: location!(),
400 });
401 }
402 _ => {
403 return Err(Error::InvalidInput {
404 source: format!(
405 "Blob encoding only supports Binary/LargeBinary or v2 Struct, got {}",
406 data_type
407 )
408 .into(),
409 location: location!(),
410 });
411 }
412 }
413 }
414
415 if Self::is_primitive_type(&data_type) {
416 Ok(Box::new(PrimitiveStructuralEncoder::try_new(
417 options,
418 self.compression_strategy.clone(),
419 column_index.next_column_index(field.id as u32),
420 field.clone(),
421 Arc::new(root_field_metadata.clone()),
422 )?))
423 } else {
424 match data_type {
425 DataType::List(_) | DataType::LargeList(_) => {
426 let child = field.children.first().expect("List should have a child");
427 let child_encoder = self.do_create_field_encoder(
428 _encoding_strategy_root,
429 child,
430 column_index,
431 options,
432 root_field_metadata,
433 )?;
434 Ok(Box::new(ListStructuralEncoder::new(
435 options.keep_original_array,
436 child_encoder,
437 )))
438 }
439 DataType::Struct(fields) => {
440 if field.is_packed_struct() || fields.is_empty() {
441 Ok(Box::new(PrimitiveStructuralEncoder::try_new(
443 options,
444 self.compression_strategy.clone(),
445 column_index.next_column_index(field.id as u32),
446 field.clone(),
447 Arc::new(root_field_metadata.clone()),
448 )?))
449 } else {
450 let children_encoders = field
451 .children
452 .iter()
453 .map(|field| {
454 self.do_create_field_encoder(
455 _encoding_strategy_root,
456 field,
457 column_index,
458 options,
459 root_field_metadata,
460 )
461 })
462 .collect::<Result<Vec<_>>>()?;
463 Ok(Box::new(StructStructuralEncoder::new(
464 options.keep_original_array,
465 children_encoders,
466 )))
467 }
468 }
469 DataType::Dictionary(_, value_type) => {
470 if Self::is_primitive_type(&value_type) {
472 Ok(Box::new(PrimitiveStructuralEncoder::try_new(
473 options,
474 self.compression_strategy.clone(),
475 column_index.next_column_index(field.id as u32),
476 field.clone(),
477 Arc::new(root_field_metadata.clone()),
478 )?))
479 } else {
480 Err(Error::NotSupported { source: format!("cannot encode a dictionary column whose value type is a logical type ({})", value_type).into(), location: location!() })
486 }
487 }
488 _ => todo!("Implement encoding for field {}", field),
489 }
490 }
491 }
492}
493
494impl FieldEncodingStrategy for StructuralEncodingStrategy {
495 fn create_field_encoder(
496 &self,
497 encoding_strategy_root: &dyn FieldEncodingStrategy,
498 field: &Field,
499 column_index: &mut ColumnIndexSequence,
500 options: &EncodingOptions,
501 ) -> Result<Box<dyn FieldEncoder>> {
502 self.do_create_field_encoder(
503 encoding_strategy_root,
504 field,
505 column_index,
506 options,
507 &field.metadata,
508 )
509 }
510}
511
512pub struct BatchEncoder {
515 pub field_encoders: Vec<Box<dyn FieldEncoder>>,
516 pub field_id_to_column_index: Vec<(u32, u32)>,
517}
518
519impl BatchEncoder {
520 pub fn try_new(
521 schema: &Schema,
522 strategy: &dyn FieldEncodingStrategy,
523 options: &EncodingOptions,
524 ) -> Result<Self> {
525 let mut col_idx = 0;
526 let mut col_idx_sequence = ColumnIndexSequence::default();
527 let field_encoders = schema
528 .fields
529 .iter()
530 .map(|field| {
531 let encoder = strategy.create_field_encoder(
532 strategy,
533 field,
534 &mut col_idx_sequence,
535 options,
536 )?;
537 col_idx += encoder.as_ref().num_columns();
538 Ok(encoder)
539 })
540 .collect::<Result<Vec<_>>>()?;
541 Ok(Self {
542 field_encoders,
543 field_id_to_column_index: col_idx_sequence.mapping,
544 })
545 }
546
547 pub fn num_columns(&self) -> u32 {
548 self.field_encoders
549 .iter()
550 .map(|field_encoder| field_encoder.num_columns())
551 .sum::<u32>()
552 }
553}
554
555#[derive(Debug)]
559pub struct EncodedBatch {
560 pub data: Bytes,
561 pub page_table: Vec<Arc<ColumnInfo>>,
562 pub schema: Arc<Schema>,
563 pub top_level_columns: Vec<u32>,
564 pub num_rows: u64,
565}
566
567fn write_page_to_data_buffer(page: EncodedPage, data_buffer: &mut BytesMut) -> PageInfo {
568 let buffers = page.data;
569 let mut buffer_offsets_and_sizes = Vec::with_capacity(buffers.len());
570 for buffer in buffers {
571 let buffer_offset = data_buffer.len() as u64;
572 data_buffer.extend_from_slice(&buffer);
573 let size = data_buffer.len() as u64 - buffer_offset;
574 buffer_offsets_and_sizes.push((buffer_offset, size));
575 }
576
577 PageInfo {
578 buffer_offsets_and_sizes: Arc::from(buffer_offsets_and_sizes),
579 encoding: page.description,
580 num_rows: page.num_rows,
581 priority: page.row_number,
582 }
583}
584
585pub async fn encode_batch(
590 batch: &RecordBatch,
591 schema: Arc<Schema>,
592 encoding_strategy: &dyn FieldEncodingStrategy,
593 options: &EncodingOptions,
594) -> Result<EncodedBatch> {
595 if !is_pwr_two(options.buffer_alignment) || options.buffer_alignment < MIN_PAGE_BUFFER_ALIGNMENT
596 {
597 return Err(Error::InvalidInput {
598 source: format!(
599 "buffer_alignment must be a power of two and at least {}",
600 MIN_PAGE_BUFFER_ALIGNMENT
601 )
602 .into(),
603 location: location!(),
604 });
605 }
606
607 let mut data_buffer = BytesMut::new();
608 let lance_schema = Schema::try_from(batch.schema().as_ref())?;
609 let options = EncodingOptions {
610 keep_original_array: true,
611 ..*options
612 };
613 let batch_encoder = BatchEncoder::try_new(&lance_schema, encoding_strategy, &options)?;
614 let mut page_table = Vec::new();
615 let mut col_idx_offset = 0;
616 for (arr, mut encoder) in batch.columns().iter().zip(batch_encoder.field_encoders) {
617 let mut external_buffers =
618 OutOfLineBuffers::new(data_buffer.len() as u64, options.buffer_alignment);
619 let repdef = RepDefBuilder::default();
620 let encoder = encoder.as_mut();
621 let num_rows = arr.len() as u64;
622 let mut tasks =
623 encoder.maybe_encode(arr.clone(), &mut external_buffers, repdef, 0, num_rows)?;
624 tasks.extend(encoder.flush(&mut external_buffers)?);
625 for buffer in external_buffers.take_buffers() {
626 data_buffer.extend_from_slice(&buffer);
627 }
628 let mut pages = HashMap::<u32, Vec<PageInfo>>::new();
629 for task in tasks {
630 let encoded_page = task.await?;
631 pages
633 .entry(encoded_page.column_idx)
634 .or_default()
635 .push(write_page_to_data_buffer(encoded_page, &mut data_buffer));
636 }
637 let mut external_buffers =
638 OutOfLineBuffers::new(data_buffer.len() as u64, options.buffer_alignment);
639 let encoded_columns = encoder.finish(&mut external_buffers).await?;
640 for buffer in external_buffers.take_buffers() {
641 data_buffer.extend_from_slice(&buffer);
642 }
643 let num_columns = encoded_columns.len();
644 for (col_idx, encoded_column) in encoded_columns.into_iter().enumerate() {
645 let col_idx = col_idx + col_idx_offset;
646 let mut col_buffer_offsets_and_sizes = Vec::new();
647 for buffer in encoded_column.column_buffers {
648 let buffer_offset = data_buffer.len() as u64;
649 data_buffer.extend_from_slice(&buffer);
650 let size = data_buffer.len() as u64 - buffer_offset;
651 col_buffer_offsets_and_sizes.push((buffer_offset, size));
652 }
653 for page in encoded_column.final_pages {
654 pages
655 .entry(page.column_idx)
656 .or_default()
657 .push(write_page_to_data_buffer(page, &mut data_buffer));
658 }
659 let col_pages = std::mem::take(pages.entry(col_idx as u32).or_default());
660 page_table.push(Arc::new(ColumnInfo {
661 index: col_idx as u32,
662 buffer_offsets_and_sizes: Arc::from(
663 col_buffer_offsets_and_sizes.into_boxed_slice(),
664 ),
665 page_infos: Arc::from(col_pages.into_boxed_slice()),
666 encoding: encoded_column.encoding,
667 }))
668 }
669 col_idx_offset += num_columns;
670 }
671 let top_level_columns = batch_encoder
672 .field_id_to_column_index
673 .iter()
674 .map(|(_, idx)| *idx)
675 .collect();
676 Ok(EncodedBatch {
677 data: data_buffer.freeze(),
678 top_level_columns,
679 page_table,
680 schema,
681 num_rows: batch.num_rows() as u64,
682 })
683}
684
685#[cfg(test)]
686mod tests {
687 use super::*;
688 use crate::compression_config::{CompressionFieldParams, CompressionParams};
689
690 #[test]
691 fn test_configured_encoding_strategy() {
692 let mut params = CompressionParams::new();
694 params.columns.insert(
695 "*_id".to_string(),
696 CompressionFieldParams {
697 rle_threshold: Some(0.5),
698 compression: Some("lz4".to_string()),
699 compression_level: None,
700 bss: None,
701 },
702 );
703
704 let strategy =
706 default_encoding_strategy_with_params(LanceFileVersion::V2_1, params.clone())
707 .expect("Should succeed for V2.1");
708
709 assert!(format!("{:?}", strategy).contains("StructuralEncodingStrategy"));
711 assert!(format!("{:?}", strategy).contains("DefaultCompressionStrategy"));
712
713 let err = default_encoding_strategy_with_params(LanceFileVersion::V2_0, params.clone())
715 .expect_err("Should fail for V2.0");
716 assert!(err
717 .to_string()
718 .contains("only supported in Lance file version 2.1"));
719
720 let err = default_encoding_strategy_with_params(LanceFileVersion::Legacy, params)
722 .expect_err("Should fail for Legacy");
723 assert!(err
724 .to_string()
725 .contains("only supported in Lance file version 2.1"));
726 }
727}