1use std::{collections::HashMap, sync::Arc};
17
18use arrow_array::{Array, ArrayRef, RecordBatch};
19use arrow_schema::DataType;
20use bytes::{Bytes, BytesMut};
21use futures::future::BoxFuture;
22use lance_core::datatypes::{Field, Schema};
23use lance_core::utils::bit::{is_pwr_two, pad_bytes_to};
24use lance_core::{Error, Result};
25use snafu::location;
26
27use crate::buffer::LanceBuffer;
28use crate::compression::{CompressionStrategy, DefaultCompressionStrategy};
29use crate::decoder::PageEncoding;
30use crate::encodings::logical::list::ListStructuralEncoder;
31use crate::encodings::logical::primitive::PrimitiveStructuralEncoder;
32use crate::encodings::logical::r#struct::StructStructuralEncoder;
33use crate::repdef::RepDefBuilder;
34use crate::version::LanceFileVersion;
35use crate::{
36 decoder::{ColumnInfo, PageInfo},
37 format::pb,
38};
39
40pub const MIN_PAGE_BUFFER_ALIGNMENT: u64 = 8;
42
43#[derive(Debug)]
49pub struct EncodedPage {
50 pub data: Vec<LanceBuffer>,
52 pub description: PageEncoding,
54 pub num_rows: u64,
56 pub row_number: u64,
63 pub column_idx: u32,
65}
66
67pub struct EncodedColumn {
68 pub column_buffers: Vec<LanceBuffer>,
69 pub encoding: pb::ColumnEncoding,
70 pub final_pages: Vec<EncodedPage>,
71}
72
73impl Default for EncodedColumn {
74 fn default() -> Self {
75 Self {
76 column_buffers: Default::default(),
77 encoding: pb::ColumnEncoding {
78 column_encoding: Some(pb::column_encoding::ColumnEncoding::Values(())),
79 },
80 final_pages: Default::default(),
81 }
82 }
83}
84
85pub struct OutOfLineBuffers {
99 position: u64,
100 buffer_alignment: u64,
101 buffers: Vec<LanceBuffer>,
102}
103
104impl OutOfLineBuffers {
105 pub fn new(base_position: u64, buffer_alignment: u64) -> Self {
106 Self {
107 position: base_position,
108 buffer_alignment,
109 buffers: Vec::new(),
110 }
111 }
112
113 pub fn add_buffer(&mut self, buffer: LanceBuffer) -> u64 {
114 let position = self.position;
115 self.position += buffer.len() as u64;
116 self.position += pad_bytes_to(buffer.len(), self.buffer_alignment as usize) as u64;
117 self.buffers.push(buffer);
118 position
119 }
120
121 pub fn take_buffers(self) -> Vec<LanceBuffer> {
122 self.buffers
123 }
124
125 pub fn reset_position(&mut self, position: u64) {
126 self.position = position;
127 }
128}
129
130pub type EncodeTask = BoxFuture<'static, Result<EncodedPage>>;
132
133pub trait FieldEncoder: Send {
144 fn maybe_encode(
164 &mut self,
165 array: ArrayRef,
166 external_buffers: &mut OutOfLineBuffers,
167 repdef: RepDefBuilder,
168 row_number: u64,
169 num_rows: u64,
170 ) -> Result<Vec<EncodeTask>>;
171 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>>;
180 fn finish(
186 &mut self,
187 external_buffers: &mut OutOfLineBuffers,
188 ) -> BoxFuture<'_, Result<Vec<EncodedColumn>>>;
189
190 fn num_columns(&self) -> u32;
192}
193
194#[derive(Debug, Default)]
197pub struct ColumnIndexSequence {
198 current_index: u32,
199 mapping: Vec<(u32, u32)>,
200}
201
202impl ColumnIndexSequence {
203 pub fn next_column_index(&mut self, field_id: u32) -> u32 {
204 let idx = self.current_index;
205 self.current_index += 1;
206 self.mapping.push((field_id, idx));
207 idx
208 }
209
210 pub fn skip(&mut self) {
211 self.current_index += 1;
212 }
213}
214
215pub struct EncodingOptions {
217 pub cache_bytes_per_column: u64,
221 pub max_page_bytes: u64,
224 pub keep_original_array: bool,
229 pub buffer_alignment: u64,
234}
235
236impl Default for EncodingOptions {
237 fn default() -> Self {
238 Self {
239 cache_bytes_per_column: 8 * 1024 * 1024,
240 max_page_bytes: 32 * 1024 * 1024,
241 keep_original_array: true,
242 buffer_alignment: 64,
243 }
244 }
245}
246
247pub trait FieldEncodingStrategy: Send + Sync + std::fmt::Debug {
253 fn create_field_encoder(
265 &self,
266 encoding_strategy_root: &dyn FieldEncodingStrategy,
267 field: &Field,
268 column_index: &mut ColumnIndexSequence,
269 options: &EncodingOptions,
270 ) -> Result<Box<dyn FieldEncoder>>;
271}
272
273pub fn default_encoding_strategy(version: LanceFileVersion) -> Box<dyn FieldEncodingStrategy> {
274 match version.resolve() {
275 LanceFileVersion::Legacy => panic!(),
276 LanceFileVersion::V2_0 => {
277 Box::new(crate::v2::encoder::CoreFieldEncodingStrategy::default())
278 }
279 _ => Box::new(StructuralEncodingStrategy::default()),
280 }
281}
282
283#[derive(Debug)]
285pub struct StructuralEncodingStrategy {
286 pub compression_strategy: Arc<dyn CompressionStrategy>,
287 pub version: LanceFileVersion,
288}
289
290#[allow(clippy::derivable_impls)]
293impl Default for StructuralEncodingStrategy {
294 fn default() -> Self {
295 Self {
296 compression_strategy: Arc::<DefaultCompressionStrategy>::default(),
297 version: LanceFileVersion::default(),
298 }
299 }
300}
301
302impl StructuralEncodingStrategy {
303 fn is_primitive_type(data_type: &DataType) -> bool {
304 matches!(
305 data_type,
306 DataType::Boolean
307 | DataType::Date32
308 | DataType::Date64
309 | DataType::Decimal128(_, _)
310 | DataType::Decimal256(_, _)
311 | DataType::Duration(_)
312 | DataType::Float16
313 | DataType::Float32
314 | DataType::Float64
315 | DataType::Int16
316 | DataType::Int32
317 | DataType::Int64
318 | DataType::Int8
319 | DataType::Interval(_)
320 | DataType::Null
321 | DataType::Time32(_)
322 | DataType::Time64(_)
323 | DataType::Timestamp(_, _)
324 | DataType::UInt16
325 | DataType::UInt32
326 | DataType::UInt64
327 | DataType::UInt8
328 | DataType::FixedSizeBinary(_)
329 | DataType::FixedSizeList(_, _)
330 | DataType::Binary
331 | DataType::LargeBinary
332 | DataType::Utf8
333 | DataType::LargeUtf8,
334 )
335 }
336
337 fn do_create_field_encoder(
338 &self,
339 _encoding_strategy_root: &dyn FieldEncodingStrategy,
340 field: &Field,
341 column_index: &mut ColumnIndexSequence,
342 options: &EncodingOptions,
343 root_field_metadata: &HashMap<String, String>,
344 ) -> Result<Box<dyn FieldEncoder>> {
345 let data_type = field.data_type();
346 if Self::is_primitive_type(&data_type) {
347 Ok(Box::new(PrimitiveStructuralEncoder::try_new(
348 options,
349 self.compression_strategy.clone(),
350 column_index.next_column_index(field.id as u32),
351 field.clone(),
352 Arc::new(root_field_metadata.clone()),
353 )?))
354 } else {
355 match data_type {
356 DataType::List(_) | DataType::LargeList(_) => {
357 let child = field.children.first().expect("List should have a child");
358 let child_encoder = self.do_create_field_encoder(
359 _encoding_strategy_root,
360 child,
361 column_index,
362 options,
363 root_field_metadata,
364 )?;
365 Ok(Box::new(ListStructuralEncoder::new(
366 options.keep_original_array,
367 child_encoder,
368 )))
369 }
370 DataType::Struct(_) => {
371 if field.is_packed_struct() {
372 Ok(Box::new(PrimitiveStructuralEncoder::try_new(
373 options,
374 self.compression_strategy.clone(),
375 column_index.next_column_index(field.id as u32),
376 field.clone(),
377 Arc::new(root_field_metadata.clone()),
378 )?))
379 } else {
380 let children_encoders = field
381 .children
382 .iter()
383 .map(|field| {
384 self.do_create_field_encoder(
385 _encoding_strategy_root,
386 field,
387 column_index,
388 options,
389 root_field_metadata,
390 )
391 })
392 .collect::<Result<Vec<_>>>()?;
393 Ok(Box::new(StructStructuralEncoder::new(
394 options.keep_original_array,
395 children_encoders,
396 )))
397 }
398 }
399 DataType::Dictionary(_, value_type) => {
400 if Self::is_primitive_type(&value_type) {
402 Ok(Box::new(PrimitiveStructuralEncoder::try_new(
403 options,
404 self.compression_strategy.clone(),
405 column_index.next_column_index(field.id as u32),
406 field.clone(),
407 Arc::new(root_field_metadata.clone()),
408 )?))
409 } else {
410 Err(Error::NotSupported { source: format!("cannot encode a dictionary column whose value type is a logical type ({})", value_type).into(), location: location!() })
416 }
417 }
418 _ => todo!("Implement encoding for field {}", field),
419 }
420 }
421 }
422}
423
424impl FieldEncodingStrategy for StructuralEncodingStrategy {
425 fn create_field_encoder(
426 &self,
427 encoding_strategy_root: &dyn FieldEncodingStrategy,
428 field: &Field,
429 column_index: &mut ColumnIndexSequence,
430 options: &EncodingOptions,
431 ) -> Result<Box<dyn FieldEncoder>> {
432 self.do_create_field_encoder(
433 encoding_strategy_root,
434 field,
435 column_index,
436 options,
437 &field.metadata,
438 )
439 }
440}
441
442pub struct BatchEncoder {
445 pub field_encoders: Vec<Box<dyn FieldEncoder>>,
446 pub field_id_to_column_index: Vec<(u32, u32)>,
447}
448
449impl BatchEncoder {
450 pub fn try_new(
451 schema: &Schema,
452 strategy: &dyn FieldEncodingStrategy,
453 options: &EncodingOptions,
454 ) -> Result<Self> {
455 let mut col_idx = 0;
456 let mut col_idx_sequence = ColumnIndexSequence::default();
457 let field_encoders = schema
458 .fields
459 .iter()
460 .map(|field| {
461 let encoder = strategy.create_field_encoder(
462 strategy,
463 field,
464 &mut col_idx_sequence,
465 options,
466 )?;
467 col_idx += encoder.as_ref().num_columns();
468 Ok(encoder)
469 })
470 .collect::<Result<Vec<_>>>()?;
471 Ok(Self {
472 field_encoders,
473 field_id_to_column_index: col_idx_sequence.mapping,
474 })
475 }
476
477 pub fn num_columns(&self) -> u32 {
478 self.field_encoders
479 .iter()
480 .map(|field_encoder| field_encoder.num_columns())
481 .sum::<u32>()
482 }
483}
484
485#[derive(Debug)]
489pub struct EncodedBatch {
490 pub data: Bytes,
491 pub page_table: Vec<Arc<ColumnInfo>>,
492 pub schema: Arc<Schema>,
493 pub top_level_columns: Vec<u32>,
494 pub num_rows: u64,
495}
496
497fn write_page_to_data_buffer(page: EncodedPage, data_buffer: &mut BytesMut) -> PageInfo {
498 let buffers = page.data;
499 let mut buffer_offsets_and_sizes = Vec::with_capacity(buffers.len());
500 for buffer in buffers {
501 let buffer_offset = data_buffer.len() as u64;
502 data_buffer.extend_from_slice(&buffer);
503 let size = data_buffer.len() as u64 - buffer_offset;
504 buffer_offsets_and_sizes.push((buffer_offset, size));
505 }
506
507 PageInfo {
508 buffer_offsets_and_sizes: Arc::from(buffer_offsets_and_sizes),
509 encoding: page.description,
510 num_rows: page.num_rows,
511 priority: page.row_number,
512 }
513}
514
515pub async fn encode_batch(
520 batch: &RecordBatch,
521 schema: Arc<Schema>,
522 encoding_strategy: &dyn FieldEncodingStrategy,
523 options: &EncodingOptions,
524) -> Result<EncodedBatch> {
525 if !is_pwr_two(options.buffer_alignment) || options.buffer_alignment < MIN_PAGE_BUFFER_ALIGNMENT
526 {
527 return Err(Error::InvalidInput {
528 source: format!(
529 "buffer_alignment must be a power of two and at least {}",
530 MIN_PAGE_BUFFER_ALIGNMENT
531 )
532 .into(),
533 location: location!(),
534 });
535 }
536
537 let mut data_buffer = BytesMut::new();
538 let lance_schema = Schema::try_from(batch.schema().as_ref())?;
539 let options = EncodingOptions {
540 keep_original_array: true,
541 ..*options
542 };
543 let batch_encoder = BatchEncoder::try_new(&lance_schema, encoding_strategy, &options)?;
544 let mut page_table = Vec::new();
545 let mut col_idx_offset = 0;
546 for (arr, mut encoder) in batch.columns().iter().zip(batch_encoder.field_encoders) {
547 let mut external_buffers =
548 OutOfLineBuffers::new(data_buffer.len() as u64, options.buffer_alignment);
549 let repdef = RepDefBuilder::default();
550 let encoder = encoder.as_mut();
551 let num_rows = arr.len() as u64;
552 let mut tasks =
553 encoder.maybe_encode(arr.clone(), &mut external_buffers, repdef, 0, num_rows)?;
554 tasks.extend(encoder.flush(&mut external_buffers)?);
555 for buffer in external_buffers.take_buffers() {
556 data_buffer.extend_from_slice(&buffer);
557 }
558 let mut pages = HashMap::<u32, Vec<PageInfo>>::new();
559 for task in tasks {
560 let encoded_page = task.await?;
561 pages
563 .entry(encoded_page.column_idx)
564 .or_default()
565 .push(write_page_to_data_buffer(encoded_page, &mut data_buffer));
566 }
567 let mut external_buffers =
568 OutOfLineBuffers::new(data_buffer.len() as u64, options.buffer_alignment);
569 let encoded_columns = encoder.finish(&mut external_buffers).await?;
570 for buffer in external_buffers.take_buffers() {
571 data_buffer.extend_from_slice(&buffer);
572 }
573 let num_columns = encoded_columns.len();
574 for (col_idx, encoded_column) in encoded_columns.into_iter().enumerate() {
575 let col_idx = col_idx + col_idx_offset;
576 let mut col_buffer_offsets_and_sizes = Vec::new();
577 for buffer in encoded_column.column_buffers {
578 let buffer_offset = data_buffer.len() as u64;
579 data_buffer.extend_from_slice(&buffer);
580 let size = data_buffer.len() as u64 - buffer_offset;
581 col_buffer_offsets_and_sizes.push((buffer_offset, size));
582 }
583 for page in encoded_column.final_pages {
584 pages
585 .entry(page.column_idx)
586 .or_default()
587 .push(write_page_to_data_buffer(page, &mut data_buffer));
588 }
589 let col_pages = std::mem::take(pages.entry(col_idx as u32).or_default());
590 page_table.push(Arc::new(ColumnInfo {
591 index: col_idx as u32,
592 buffer_offsets_and_sizes: Arc::from(
593 col_buffer_offsets_and_sizes.into_boxed_slice(),
594 ),
595 page_infos: Arc::from(col_pages.into_boxed_slice()),
596 encoding: encoded_column.encoding,
597 }))
598 }
599 col_idx_offset += num_columns;
600 }
601 let top_level_columns = batch_encoder
602 .field_id_to_column_index
603 .iter()
604 .map(|(_, idx)| *idx)
605 .collect();
606 Ok(EncodedBatch {
607 data: data_buffer.freeze(),
608 top_level_columns,
609 page_table,
610 schema,
611 num_rows: batch.num_rows() as u64,
612 })
613}