1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors
use arrow_array::ArrayRef;
use arrow_buffer::Buffer;
use arrow_schema::DataType;
use futures::future::BoxFuture;
use lance_core::datatypes::{Field, Schema};
use lance_core::Result;
use crate::{
encodings::logical::{
binary::BinaryFieldEncoder, list::ListFieldEncoder, primitive::PrimitiveFieldEncoder,
r#struct::StructFieldEncoder,
},
format::pb,
};
/// An encoded buffer
pub struct EncodedBuffer {
/// Buffers that make up the encoded buffer
///
/// All of these buffers should be written to the file as one contiguous buffer
///
/// This is a Vec to allow for zero-copy
///
/// For example, if we are asked to write 3 primitive arrays of 1000 rows and we can write them all
/// as one page then this will be the value buffers from the 3 primitive arrays
pub parts: Vec<Buffer>,
}
// Custom impl because buffers shouldn't be included in debug output
impl std::fmt::Debug for EncodedBuffer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EncodedBuffer")
.field("len", &self.parts.iter().map(|p| p.len()).sum::<usize>())
.finish()
}
}
pub struct EncodedArrayBuffer {
/// The data making up the buffer
pub parts: Vec<Buffer>,
/// The index of the buffer in the page
pub index: u32,
}
// Custom impl because buffers shouldn't be included in debug output
impl std::fmt::Debug for EncodedArrayBuffer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EncodedBuffer")
.field("len", &self.parts.iter().map(|p| p.len()).sum::<usize>())
.field("index", &self.index)
.finish()
}
}
/// An encoded array
///
/// Maps to a single Arrow array
///
/// This may contain multiple buffers. For example, a nullable int32 array will contain two buffers,
/// one for the null bitmap and one for the values
#[derive(Debug)]
pub struct EncodedArray {
/// The encoded buffers
pub buffers: Vec<EncodedArrayBuffer>,
/// A description of the encoding used to encode the array
pub encoding: pb::ArrayEncoding,
}
/// An encoded page of data
///
/// Maps to a top-level array
///
/// For example, FixedSizeList<Int32> will have two EncodedArray instances and one EncodedPage
#[derive(Debug)]
pub struct EncodedPage {
// The encoded array data
pub array: EncodedArray,
/// The number of rows in the encoded page
pub num_rows: u32,
/// The index of the column
pub column_idx: u32,
}
/// Encodes data into a single buffer
pub trait BufferEncoder: std::fmt::Debug + Send + Sync {
/// Encode data
///
/// This method may receive multiple chunks and should encode them all into
/// a single EncodedBuffer (though that buffer may have multiple parts). All
/// parts will be written to the file as one contiguous block.
fn encode(&self, arrays: &[ArrayRef]) -> Result<EncodedBuffer>;
}
/// Encodes data from Arrow format into some kind of on-disk format
///
/// The encoder is responsible for looking at the incoming data and determining
/// which encoding is most appropriate. This may involve calculating statistics,
/// etc. It then needs to actually encode that data according to the chosen encoding.
///
/// The encoder may even encode the statistics as well (typically in the column
/// metadata) so that the statistics can be used for filtering later.
///
/// The array encoder must be Send + Sync. Encoding is always done on its own
/// thread task in the background and there could potentially be multiple encode
/// tasks running for a column at once.
///
/// Note: not all Arrow arrays can be encoded using an ArrayEncoder. Some arrays
/// will be econded into several Lance columns. For example, a list array or a
/// struct array. See [FieldEncoder] for the top-level encoding entry point
pub trait ArrayEncoder: std::fmt::Debug + Send + Sync {
/// Encode data
///
/// This method may receive multiple chunks and should encode them into a
/// single EncodedPage.
///
/// The result should contain a description of the encoding that was chosen.
/// This can be used to decode the data later.
fn encode(&self, arrays: &[ArrayRef], buffer_index: &mut u32) -> Result<EncodedArray>;
}
/// A task to create a page of data
pub type EncodeTask = BoxFuture<'static, Result<EncodedPage>>;
/// Top level encoding trait to code any Arrow array type into one or more pages.
///
/// The field encoder implements buffering and encoding of a single input column
/// but it may map to multiple output columns. For example, a list array or struct
/// array will be encoded into multiple columns.
///
/// Also, fields may be encoded at different speeds. For example, given a struct
/// column with three fields (a boolean field, an int32 field, and a 4096-dimension
/// tensor field) the tensor field is likely to emit encoded pages much more frequently
/// than the boolean field.
pub trait FieldEncoder: Send {
/// Buffer the data and, if there is enough data in the buffer to form a page, return
/// an encoding task to encode the data.
///
/// This may return more than one task because a single column may be mapped to multiple
/// output columns. For example, if encoding a struct column with three children then
/// up to three tasks may be returned from each call to maybe_encode.
///
/// It may also return multiple tasks for a single column if the input array is larger
/// than a single disk page.
///
/// It could also return an empty Vec if there is not enough data yet to encode any pages.
fn maybe_encode(&mut self, array: ArrayRef) -> Result<Vec<EncodeTask>>;
/// Flush any remaining data from the buffers into encoding tasks
fn flush(&mut self) -> Result<Vec<EncodeTask>>;
/// The number of output columns this encoding will create
fn num_columns(&self) -> u32;
}
pub struct BatchEncoder {
pub field_encoders: Vec<Box<dyn FieldEncoder>>,
pub field_id_to_column_index: Vec<(i32, i32)>,
}
impl BatchEncoder {
pub(crate) fn get_encoder_for_field(
field: &Field,
cache_bytes_per_column: u64,
col_idx: &mut u32,
field_col_mapping: &mut Vec<(i32, i32)>,
) -> Result<Box<dyn FieldEncoder>> {
match field.data_type() {
DataType::Boolean
| DataType::Date32
| DataType::Date64
| DataType::Decimal128(_, _)
| DataType::Decimal256(_, _)
| DataType::Duration(_)
| DataType::Float16
| DataType::Float32
| DataType::Float64
| DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::Int8
| DataType::Interval(_)
| DataType::Null
| DataType::RunEndEncoded(_, _)
| DataType::Time32(_)
| DataType::Time64(_)
| DataType::Timestamp(_, _)
| DataType::UInt16
| DataType::UInt32
| DataType::UInt64
| DataType::UInt8
| DataType::FixedSizeList(_, _) => {
let my_col_idx = *col_idx;
*col_idx += 1;
field_col_mapping.push((field.id, my_col_idx as i32));
Ok(Box::new(PrimitiveFieldEncoder::try_new(
cache_bytes_per_column,
&field.data_type(),
my_col_idx,
)?))
}
DataType::List(_) => {
let my_col_idx = *col_idx;
field_col_mapping.push((field.id, my_col_idx as i32));
*col_idx += 1;
let inner_encoding = Self::get_encoder_for_field(
&field.children[0],
cache_bytes_per_column,
col_idx,
field_col_mapping,
)?;
Ok(Box::new(ListFieldEncoder::new(
inner_encoding,
cache_bytes_per_column,
my_col_idx,
)))
}
DataType::Struct(_) => {
let header_col_idx = *col_idx;
field_col_mapping.push((field.id, header_col_idx as i32));
*col_idx += 1;
let children_encoders = field
.children
.iter()
.map(|field| {
Self::get_encoder_for_field(
field,
cache_bytes_per_column,
col_idx,
field_col_mapping,
)
})
.collect::<Result<Vec<_>>>()?;
Ok(Box::new(StructFieldEncoder::new(
children_encoders,
header_col_idx,
)))
}
DataType::Utf8 | DataType::Binary => {
let my_col_idx = *col_idx;
field_col_mapping.push((field.id, my_col_idx as i32));
*col_idx += 2;
Ok(Box::new(BinaryFieldEncoder::new(
cache_bytes_per_column,
my_col_idx,
)))
}
_ => todo!("Implement encoding for data type {}", field.data_type()),
}
}
pub fn try_new(schema: &Schema, cache_bytes_per_column: u64) -> Result<Self> {
let mut col_idx = 0;
let mut field_col_mapping = Vec::new();
let field_encoders = schema
.fields
.iter()
.map(|field| {
Self::get_encoder_for_field(
field,
cache_bytes_per_column,
&mut col_idx,
&mut field_col_mapping,
)
})
.collect::<Result<Vec<_>>>()?;
Ok(Self {
field_encoders,
field_id_to_column_index: field_col_mapping,
})
}
pub fn num_columns(&self) -> u32 {
self.field_encoders
.iter()
.map(|field_encoder| field_encoder.num_columns())
.sum::<u32>()
}
}