use std::{fmt::Debug, iter, ops::Range, sync::Arc, vec};
use arrow::array::AsArray;
use arrow_array::{make_array, Array, ArrayRef};
use arrow_buffer::{bit_util, BooleanBuffer, NullBuffer};
use arrow_schema::DataType;
use futures::{future::BoxFuture, FutureExt};
use lance_arrow::deepcopy::deep_copy_array;
use log::{debug, trace};
use snafu::{location, Location};
use lance_core::{datatypes::Field, utils::tokio::spawn_cpu, Result};
use crate::{
buffer::LanceBuffer,
data::{BlockInfo, DataBlock, FixedWidthDataBlock, UsedEncoding},
decoder::{
DecodeArrayTask, FieldScheduler, FilterExpression, LogicalPageDecoder, NextDecodeTask,
PageEncoding, PageInfo, PageScheduler, PrimitivePageDecoder, PriorityRange,
ScheduledScanLine, SchedulerContext, SchedulingJob,
},
encoder::{
ArrayEncodingStrategy, CompressionStrategy, EncodeTask, EncodedColumn, EncodedPage,
EncodingOptions, FieldEncoder, MiniBlockChunk, MiniBlockCompressed, OutOfLineBuffers,
},
encodings::physical::{decoder_from_array_encoding, ColumnBuffers, PageBuffers},
format::{pb, ProtobufUtils},
repdef::{LevelBuffer, RepDefBuilder},
};
#[derive(Debug)]
struct PrimitivePage {
scheduler: Box<dyn PageScheduler>,
num_rows: u64,
page_index: u32,
}
#[derive(Debug)]
pub struct PrimitiveFieldScheduler {
data_type: DataType,
page_schedulers: Vec<PrimitivePage>,
num_rows: u64,
should_validate: bool,
column_index: u32,
}
impl PrimitiveFieldScheduler {
pub fn new(
column_index: u32,
data_type: DataType,
pages: Arc<[PageInfo]>,
buffers: ColumnBuffers,
should_validate: bool,
) -> Self {
let page_schedulers = pages
.iter()
.enumerate()
.filter(|(page_index, page)| {
log::trace!("Skipping empty page with index {}", page_index);
page.num_rows > 0
})
.map(|(page_index, page)| {
let page_buffers = PageBuffers {
column_buffers: buffers,
positions_and_sizes: &page.buffer_offsets_and_sizes,
};
let scheduler = decoder_from_array_encoding(
page.encoding.as_legacy(),
&page_buffers,
&data_type,
);
PrimitivePage {
scheduler,
num_rows: page.num_rows,
page_index: page_index as u32,
}
})
.collect::<Vec<_>>();
let num_rows = page_schedulers.iter().map(|p| p.num_rows).sum();
Self {
data_type,
page_schedulers,
num_rows,
should_validate,
column_index,
}
}
}
#[derive(Debug)]
struct PrimitiveFieldSchedulingJob<'a> {
scheduler: &'a PrimitiveFieldScheduler,
ranges: Vec<Range<u64>>,
page_idx: usize,
range_idx: usize,
range_offset: u64,
global_row_offset: u64,
}
impl<'a> PrimitiveFieldSchedulingJob<'a> {
pub fn new(scheduler: &'a PrimitiveFieldScheduler, ranges: Vec<Range<u64>>) -> Self {
Self {
scheduler,
ranges,
page_idx: 0,
range_idx: 0,
range_offset: 0,
global_row_offset: 0,
}
}
}
impl<'a> SchedulingJob for PrimitiveFieldSchedulingJob<'a> {
fn schedule_next(
&mut self,
context: &mut SchedulerContext,
priority: &dyn PriorityRange,
) -> Result<ScheduledScanLine> {
debug_assert!(self.range_idx < self.ranges.len());
let mut range = self.ranges[self.range_idx].clone();
range.start += self.range_offset;
let mut cur_page = &self.scheduler.page_schedulers[self.page_idx];
trace!(
"Current range is {:?} and current page has {} rows",
range,
cur_page.num_rows
);
while cur_page.num_rows + self.global_row_offset <= range.start {
self.global_row_offset += cur_page.num_rows;
self.page_idx += 1;
trace!("Skipping entire page of {} rows", cur_page.num_rows);
cur_page = &self.scheduler.page_schedulers[self.page_idx];
}
let mut ranges_in_page = Vec::new();
while cur_page.num_rows + self.global_row_offset > range.start {
range.start = range.start.max(self.global_row_offset);
let start_in_page = range.start - self.global_row_offset;
let end_in_page = start_in_page + (range.end - range.start);
let end_in_page = end_in_page.min(cur_page.num_rows);
let last_in_range = (end_in_page + self.global_row_offset) >= range.end;
ranges_in_page.push(start_in_page..end_in_page);
if last_in_range {
self.range_idx += 1;
if self.range_idx == self.ranges.len() {
break;
}
range = self.ranges[self.range_idx].clone();
} else {
break;
}
}
let num_rows_in_next = ranges_in_page.iter().map(|r| r.end - r.start).sum();
trace!(
"Scheduling {} rows across {} ranges from page with {} rows (priority={}, column_index={}, page_index={})",
num_rows_in_next,
ranges_in_page.len(),
cur_page.num_rows,
priority.current_priority(),
self.scheduler.column_index,
cur_page.page_index,
);
self.global_row_offset += cur_page.num_rows;
self.page_idx += 1;
let physical_decoder = cur_page.scheduler.schedule_ranges(
&ranges_in_page,
context.io(),
priority.current_priority(),
);
let logical_decoder = PrimitiveFieldDecoder {
data_type: self.scheduler.data_type.clone(),
column_index: self.scheduler.column_index,
unloaded_physical_decoder: Some(physical_decoder),
physical_decoder: None,
rows_drained: 0,
num_rows: num_rows_in_next,
should_validate: self.scheduler.should_validate,
page_index: cur_page.page_index,
};
let decoder = Box::new(logical_decoder);
let decoder_ready = context.locate_decoder(decoder);
Ok(ScheduledScanLine {
decoders: vec![decoder_ready],
rows_scheduled: num_rows_in_next,
})
}
fn num_rows(&self) -> u64 {
self.ranges.iter().map(|r| r.end - r.start).sum()
}
}
impl FieldScheduler for PrimitiveFieldScheduler {
fn num_rows(&self) -> u64 {
self.num_rows
}
fn schedule_ranges<'a>(
&'a self,
ranges: &[std::ops::Range<u64>],
_filter: &FilterExpression,
) -> Result<Box<dyn SchedulingJob + 'a>> {
Ok(Box::new(PrimitiveFieldSchedulingJob::new(
self,
ranges.to_vec(),
)))
}
fn initialize<'a>(
&'a self,
_filter: &'a FilterExpression,
_context: &'a SchedulerContext,
) -> BoxFuture<'a, Result<()>> {
std::future::ready(Ok(())).boxed()
}
}
pub struct PrimitiveFieldDecoder {
data_type: DataType,
unloaded_physical_decoder: Option<BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>>,
physical_decoder: Option<Arc<dyn PrimitivePageDecoder>>,
should_validate: bool,
num_rows: u64,
rows_drained: u64,
column_index: u32,
page_index: u32,
}
impl PrimitiveFieldDecoder {
pub fn new_from_data(
physical_decoder: Arc<dyn PrimitivePageDecoder>,
data_type: DataType,
num_rows: u64,
should_validate: bool,
) -> Self {
Self {
data_type,
unloaded_physical_decoder: None,
physical_decoder: Some(physical_decoder),
should_validate,
num_rows,
rows_drained: 0,
column_index: u32::MAX,
page_index: u32::MAX,
}
}
}
impl Debug for PrimitiveFieldDecoder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PrimitiveFieldDecoder")
.field("data_type", &self.data_type)
.field("num_rows", &self.num_rows)
.field("rows_drained", &self.rows_drained)
.finish()
}
}
struct PrimitiveFieldDecodeTask {
rows_to_skip: u64,
rows_to_take: u64,
should_validate: bool,
physical_decoder: Arc<dyn PrimitivePageDecoder>,
data_type: DataType,
}
impl DecodeArrayTask for PrimitiveFieldDecodeTask {
fn decode(self: Box<Self>) -> Result<ArrayRef> {
let block = self
.physical_decoder
.decode(self.rows_to_skip, self.rows_to_take)?;
let array = make_array(block.into_arrow(self.data_type.clone(), self.should_validate)?);
if let DataType::Dictionary(_, _) = self.data_type {
let dict = array.as_any_dictionary();
if let Some(nulls) = array.logical_nulls() {
let new_indices = dict.keys().to_data();
let new_array = make_array(
new_indices
.into_builder()
.nulls(Some(nulls))
.add_child_data(dict.values().to_data())
.data_type(dict.data_type().clone())
.build()?,
);
return Ok(new_array);
}
}
Ok(array)
}
}
impl LogicalPageDecoder for PrimitiveFieldDecoder {
fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<Result<()>> {
log::trace!(
"primitive wait for more than {} rows on column {} and page {} (page has {} rows)",
loaded_need,
self.column_index,
self.page_index,
self.num_rows
);
async move {
let physical_decoder = self.unloaded_physical_decoder.take().unwrap().await?;
self.physical_decoder = Some(Arc::from(physical_decoder));
Ok(())
}
.boxed()
}
fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
if self.physical_decoder.as_ref().is_none() {
return Err(lance_core::Error::Internal {
message: format!("drain was called on primitive field decoder for data type {} on column {} but the decoder was never awaited", self.data_type, self.column_index),
location: location!(),
});
}
let rows_to_skip = self.rows_drained;
let rows_to_take = num_rows;
self.rows_drained += rows_to_take;
let task = Box::new(PrimitiveFieldDecodeTask {
rows_to_skip,
rows_to_take,
should_validate: self.should_validate,
physical_decoder: self.physical_decoder.as_ref().unwrap().clone(),
data_type: self.data_type.clone(),
});
Ok(NextDecodeTask {
task,
num_rows: rows_to_take,
has_more: self.rows_drained != self.num_rows,
})
}
fn rows_loaded(&self) -> u64 {
if self.unloaded_physical_decoder.is_some() {
0
} else {
self.num_rows
}
}
fn rows_drained(&self) -> u64 {
if self.unloaded_physical_decoder.is_some() {
0
} else {
self.rows_drained
}
}
fn num_rows(&self) -> u64 {
self.num_rows
}
fn data_type(&self) -> &DataType {
&self.data_type
}
}
#[derive(Debug)]
pub struct AccumulationQueue {
cache_bytes: u64,
keep_original_array: bool,
buffered_arrays: Vec<ArrayRef>,
current_bytes: u64,
row_number: u64,
column_index: u32,
}
impl AccumulationQueue {
pub fn new(cache_bytes: u64, column_index: u32, keep_original_array: bool) -> Self {
Self {
cache_bytes,
buffered_arrays: Vec::new(),
current_bytes: 0,
column_index,
keep_original_array,
row_number: u64::MAX,
}
}
pub fn insert(&mut self, array: ArrayRef, row_number: u64) -> Option<(Vec<ArrayRef>, u64)> {
if self.row_number == u64::MAX {
self.row_number = row_number;
}
self.current_bytes += array.get_array_memory_size() as u64;
if self.current_bytes > self.cache_bytes {
debug!(
"Flushing column {} page of size {} bytes (unencoded)",
self.column_index, self.current_bytes
);
self.buffered_arrays.push(array);
self.current_bytes = 0;
let row_number = self.row_number;
self.row_number = u64::MAX;
Some((std::mem::take(&mut self.buffered_arrays), row_number))
} else {
trace!(
"Accumulating data for column {}. Now at {} bytes",
self.column_index,
self.current_bytes
);
if self.keep_original_array {
self.buffered_arrays.push(array);
} else {
self.buffered_arrays.push(deep_copy_array(array.as_ref()))
}
None
}
}
pub fn flush(&mut self) -> Option<(Vec<ArrayRef>, u64)> {
if self.buffered_arrays.is_empty() {
trace!(
"No final flush since no data at column {}",
self.column_index
);
None
} else {
trace!(
"Final flush of column {} which has {} bytes",
self.column_index,
self.current_bytes
);
self.current_bytes = 0;
let row_number = self.row_number;
self.row_number = 0;
Some((std::mem::take(&mut self.buffered_arrays), row_number))
}
}
}
pub struct PrimitiveFieldEncoder {
accumulation_queue: AccumulationQueue,
array_encoding_strategy: Arc<dyn ArrayEncodingStrategy>,
column_index: u32,
field: Field,
max_page_bytes: u64,
}
impl PrimitiveFieldEncoder {
pub fn try_new(
options: &EncodingOptions,
array_encoding_strategy: Arc<dyn ArrayEncodingStrategy>,
column_index: u32,
field: Field,
) -> Result<Self> {
Ok(Self {
accumulation_queue: AccumulationQueue::new(
options.cache_bytes_per_column,
column_index,
options.keep_original_array,
),
column_index,
max_page_bytes: options.max_page_bytes,
array_encoding_strategy,
field,
})
}
fn create_encode_task(&mut self, arrays: Vec<ArrayRef>) -> Result<EncodeTask> {
let encoder = self
.array_encoding_strategy
.create_array_encoder(&arrays, &self.field)?;
let column_idx = self.column_index;
let data_type = self.field.data_type();
Ok(tokio::task::spawn(async move {
let num_values = arrays.iter().map(|arr| arr.len() as u64).sum();
let data = DataBlock::from_arrays(&arrays, num_values);
let mut buffer_index = 0;
let array = encoder.encode(data, &data_type, &mut buffer_index)?;
let (data, description) = array.into_buffers();
Ok(EncodedPage {
data,
description: PageEncoding::Legacy(description),
num_rows: num_values,
column_idx,
row_number: 0, })
})
.map(|res_res| res_res.unwrap())
.boxed())
}
fn do_flush(&mut self, arrays: Vec<ArrayRef>) -> Result<Vec<EncodeTask>> {
if arrays.len() == 1 {
let array = arrays.into_iter().next().unwrap();
let size_bytes = array.get_buffer_memory_size();
let num_parts = bit_util::ceil(size_bytes, self.max_page_bytes as usize);
let num_parts = num_parts.min(array.len());
if num_parts <= 1 {
Ok(vec![self.create_encode_task(vec![array])?])
} else {
let mut tasks = Vec::with_capacity(num_parts);
let mut offset = 0;
let part_size = bit_util::ceil(array.len(), num_parts);
for _ in 0..num_parts {
let avail = array.len() - offset;
let chunk_size = avail.min(part_size);
let part = array.slice(offset, chunk_size);
let task = self.create_encode_task(vec![part])?;
tasks.push(task);
offset += chunk_size;
}
Ok(tasks)
}
} else {
Ok(vec![self.create_encode_task(arrays)?])
}
}
}
impl FieldEncoder for PrimitiveFieldEncoder {
fn maybe_encode(
&mut self,
array: ArrayRef,
_external_buffers: &mut OutOfLineBuffers,
_repdef: RepDefBuilder,
_row_number: u64,
) -> Result<Vec<EncodeTask>> {
if let Some(arrays) = self.accumulation_queue.insert(array, 0) {
Ok(self.do_flush(arrays.0)?)
} else {
Ok(vec![])
}
}
fn flush(&mut self, _external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
if let Some(arrays) = self.accumulation_queue.flush() {
Ok(self.do_flush(arrays.0)?)
} else {
Ok(vec![])
}
}
fn num_columns(&self) -> u32 {
1
}
fn finish(
&mut self,
_external_buffers: &mut OutOfLineBuffers,
) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
std::future::ready(Ok(vec![EncodedColumn::default()])).boxed()
}
}
const MINIBLOCK_SIZE_MULTIPLIER: u64 = 6;
const MINIBLOCK_MAX_PADDING: u64 = MINIBLOCK_SIZE_MULTIPLIER - 1;
pub struct PrimitiveStructuralEncoder {
accumulation_queue: AccumulationQueue,
accumulated_repdefs: Vec<RepDefBuilder>,
compression_strategy: Arc<dyn CompressionStrategy>,
column_index: u32,
field: Field,
}
impl PrimitiveStructuralEncoder {
pub fn try_new(
options: &EncodingOptions,
compression_strategy: Arc<dyn CompressionStrategy>,
column_index: u32,
field: Field,
) -> Result<Self> {
Ok(Self {
accumulation_queue: AccumulationQueue::new(
options.cache_bytes_per_column,
column_index,
options.keep_original_array,
),
accumulated_repdefs: Vec::new(),
column_index,
compression_strategy,
field,
})
}
fn is_narrow(num_rows: u64, num_bytes: u64) -> bool {
let avg_bytes_per_row = num_bytes as f64 / num_rows as f64;
avg_bytes_per_row < 128.0
}
fn serialize_miniblocks(
miniblocks: MiniBlockCompressed,
rep: Vec<LanceBuffer>,
def: Vec<LanceBuffer>,
) -> (LanceBuffer, LanceBuffer) {
let bytes_rep = rep.iter().map(|r| r.len()).sum::<usize>();
let bytes_def = def.iter().map(|d| d.len()).sum::<usize>();
let max_bytes_repdef_len = rep.len() * 4;
let mut data_buffer = Vec::with_capacity(
miniblocks.data.len()
+ bytes_rep
+ bytes_def
+ max_bytes_repdef_len
+ MINIBLOCK_MAX_PADDING as usize,
);
let mut meta_buffer = Vec::with_capacity(miniblocks.data.len() * 2);
let mut value_offset = 0;
for ((chunk, rep), def) in miniblocks.chunks.into_iter().zip(rep).zip(def) {
let chunk_bytes = chunk.num_bytes as u64 + rep.len() as u64 + def.len() as u64 + 4;
assert!(chunk_bytes <= 16 * 1024);
assert!(chunk_bytes > 0);
let divided_bytes = (chunk_bytes - 1).div_ceil(MINIBLOCK_SIZE_MULTIPLIER);
let pad_bytes = (MINIBLOCK_SIZE_MULTIPLIER * divided_bytes) - (chunk_bytes - 1);
let metadata = ((divided_bytes << 4) | chunk.log_num_values as u64) as u16;
meta_buffer.extend_from_slice(&metadata.to_le_bytes());
assert!(rep.len() < u16::MAX as usize);
assert!(def.len() < u16::MAX as usize);
let bytes_rep = rep.len() as u16;
let bytes_def = def.len() as u16;
data_buffer.extend_from_slice(&bytes_rep.to_le_bytes());
data_buffer.extend_from_slice(&bytes_def.to_le_bytes());
data_buffer.extend_from_slice(&rep);
data_buffer.extend_from_slice(&def);
let num_value_bytes = chunk.num_bytes as usize;
let values =
&miniblocks.data[value_offset as usize..value_offset as usize + num_value_bytes];
data_buffer.extend_from_slice(values);
data_buffer.extend(iter::repeat(0).take(pad_bytes as usize));
value_offset += num_value_bytes as u64;
}
(
LanceBuffer::Owned(data_buffer),
LanceBuffer::Owned(meta_buffer),
)
}
fn compress_levels(
levels: Option<LevelBuffer>,
num_values: u64,
compression_strategy: &dyn CompressionStrategy,
chunks: &[MiniBlockChunk],
) -> Result<(Vec<LanceBuffer>, pb::ArrayEncoding)> {
if let Some(levels) = levels {
debug_assert_eq!(num_values as usize, levels.len());
let mut levels_buf = LanceBuffer::reinterpret_vec(levels);
let levels_block = DataBlock::FixedWidth(FixedWidthDataBlock {
data: levels_buf.borrow_and_clone(),
bits_per_value: 16,
num_values,
block_info: BlockInfo::new(),
used_encoding: UsedEncoding::new(),
});
let levels_field = Field::new_arrow("", DataType::UInt16, false)?;
let (compressor, compressor_desc) =
compression_strategy.create_block_compressor(&levels_field, &levels_block)?;
let mut buffers = Vec::with_capacity(chunks.len());
let mut off = 0;
let mut values_counter = 0;
for chunk in chunks {
let chunk_num_values = chunk.num_values(values_counter, num_values);
values_counter += chunk_num_values;
let level_bytes = chunk_num_values as usize * 2;
let chunk_levels = levels_buf.slice_with_length(off, level_bytes);
let chunk_levels_block = DataBlock::FixedWidth(FixedWidthDataBlock {
data: chunk_levels,
bits_per_value: 16,
num_values: chunk_num_values,
block_info: BlockInfo::new(),
used_encoding: UsedEncoding::new(),
});
let compressed_levels = compressor.compress(chunk_levels_block)?;
off += level_bytes;
buffers.push(compressed_levels);
}
Ok((buffers, compressor_desc))
} else {
let data = chunks.iter().map(|_| LanceBuffer::empty()).collect();
let scalar = 0_u16.to_le_bytes().to_vec();
let encoding = ProtobufUtils::constant(scalar, num_values);
Ok((data, encoding))
}
}
fn encode_miniblock(
column_idx: u32,
field: &Field,
compression_strategy: &dyn CompressionStrategy,
arrays: Vec<ArrayRef>,
repdefs: Vec<RepDefBuilder>,
num_values: u64,
row_number: u64,
) -> Result<EncodedPage> {
let repdef = RepDefBuilder::serialize(repdefs);
let data = DataBlock::from_arrays(&arrays, num_values);
let num_values = data.num_values();
let data = data.remove_validity();
let compressor = compression_strategy.create_miniblock_compressor(field, &data)?;
let (compressed_data, value_encoding) = compressor.compress(data)?;
let (compressed_rep, rep_encoding) = Self::compress_levels(
repdef.repetition_levels,
num_values,
compression_strategy,
&compressed_data.chunks,
)?;
let (compressed_def, def_encoding) = Self::compress_levels(
repdef.definition_levels,
num_values,
compression_strategy,
&compressed_data.chunks,
)?;
let (block_value_buffer, block_meta_buffer) =
Self::serialize_miniblocks(compressed_data, compressed_rep, compressed_def);
let description = ProtobufUtils::miniblock(rep_encoding, def_encoding, value_encoding);
Ok(EncodedPage {
num_rows: num_values,
column_idx,
data: vec![block_meta_buffer, block_value_buffer],
description: PageEncoding::Structural(description),
row_number,
})
}
fn do_flush(
&mut self,
arrays: Vec<ArrayRef>,
repdefs: Vec<RepDefBuilder>,
row_number: u64,
) -> Result<Vec<EncodeTask>> {
let column_idx = self.column_index;
let compression_strategy = self.compression_strategy.clone();
let field = self.field.clone();
let task = spawn_cpu(move || {
let num_values = arrays.iter().map(|arr| arr.len() as u64).sum();
let num_bytes = arrays
.iter()
.map(|arr| arr.get_buffer_memory_size() as u64)
.sum();
if Self::is_narrow(num_values, num_bytes) {
Self::encode_miniblock(
column_idx,
&field,
compression_strategy.as_ref(),
arrays,
repdefs,
num_values,
row_number,
)
} else {
todo!("Full zipped encoding")
}
})
.boxed();
Ok(vec![task])
}
fn extract_validity_buf(array: &dyn Array, repdef: &mut RepDefBuilder) {
if let Some(validity) = array.nulls() {
repdef.add_validity_bitmap(validity.clone());
} else {
repdef.add_no_null(array.len());
}
}
fn extract_validity(array: &dyn Array, repdef: &mut RepDefBuilder) {
match array.data_type() {
DataType::Null => {
repdef.add_validity_bitmap(NullBuffer::new(BooleanBuffer::new_unset(array.len())));
}
DataType::FixedSizeList(_, _) => {
Self::extract_validity_buf(array, repdef);
Self::extract_validity(array.as_fixed_size_list().values(), repdef);
}
DataType::Dictionary(_, _) => {
unreachable!()
}
_ => Self::extract_validity_buf(array, repdef),
}
}
}
impl FieldEncoder for PrimitiveStructuralEncoder {
fn maybe_encode(
&mut self,
array: ArrayRef,
_external_buffers: &mut OutOfLineBuffers,
mut repdef: RepDefBuilder,
row_number: u64,
) -> Result<Vec<EncodeTask>> {
Self::extract_validity(array.as_ref(), &mut repdef);
self.accumulated_repdefs.push(repdef);
if let Some((arrays, row_number)) = self.accumulation_queue.insert(array, row_number) {
let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
Ok(self.do_flush(arrays, accumulated_repdefs, row_number)?)
} else {
Ok(vec![])
}
}
fn flush(&mut self, _external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
if let Some((arrays, row_number)) = self.accumulation_queue.flush() {
let accumulated_repdefs = std::mem::take(&mut self.accumulated_repdefs);
Ok(self.do_flush(arrays, accumulated_repdefs, row_number)?)
} else {
Ok(vec![])
}
}
fn num_columns(&self) -> u32 {
1
}
fn finish(
&mut self,
_external_buffers: &mut OutOfLineBuffers,
) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
std::future::ready(Ok(vec![EncodedColumn::default()])).boxed()
}
}