use std::io::Write;
use byteorder::{LittleEndian, WriteBytesExt};
use crate::Result;
use crate::directories::{Directory, DirectoryWriter};
use crate::dsl::FieldType;
use crate::segment::reader::SegmentReader;
use crate::segment::types::SegmentFiles;
use crate::structures::fast_field::{
BLOCK_INDEX_ENTRY_SIZE, BlockIndexEntry, FastFieldColumnType, FastFieldTocEntry,
FastFieldWriter, write_fast_field_toc_and_footer,
};
use super::SegmentMerger;
impl SegmentMerger {
pub(super) async fn merge_fast_fields<D: Directory + DirectoryWriter>(
&self,
dir: &D,
segments: &[SegmentReader],
files: &SegmentFiles,
) -> Result<usize> {
let fast_fields: Vec<(u32, FieldType)> = self
.schema
.fields()
.filter(|(_, entry)| {
entry.fast
&& matches!(
entry.field_type,
FieldType::U64 | FieldType::I64 | FieldType::F64 | FieldType::Text
)
})
.map(|(field, entry)| (field.0, entry.field_type.clone()))
.collect();
if fast_fields.is_empty() {
return Ok(0);
}
let has_data = segments.iter().any(|s| !s.fast_fields().is_empty());
if !has_data {
return Ok(0);
}
let total_docs: u32 = segments.iter().map(|s| s.num_docs()).sum();
let mut sorted_fields = fast_fields.clone();
sorted_fields.sort_by_key(|&(id, _)| id);
let mut fast_writer = dir.streaming_writer(&files.fast).await?;
let mut toc_entries: Vec<FastFieldTocEntry> = Vec::with_capacity(sorted_fields.len());
let mut current_offset = 0u64;
for &(field_id, ref field_type) in &sorted_fields {
let is_multi = self
.schema
.get_field_entry(crate::dsl::Field(field_id))
.map(|e| e.multi)
.unwrap_or(false);
let column_type = match field_type {
FieldType::U64 => FastFieldColumnType::U64,
FieldType::I64 => FastFieldColumnType::I64,
FieldType::F64 => FastFieldColumnType::F64,
FieldType::Text => FastFieldColumnType::TextOrdinal,
_ => continue,
};
let mut all_blocks: Vec<SourceBlock> = Vec::new();
for segment in segments.iter() {
let num_docs = segment.num_docs();
match segment.fast_field(field_id) {
Some(reader) => {
for block in reader.blocks() {
all_blocks.push(SourceBlock::Raw {
num_docs: block.num_docs,
data: block.data.as_slice(),
dict_count: block.dict.as_ref().map(|d| d.len()).unwrap_or(0),
dict_bytes: block.raw_dict.as_slice(),
});
}
}
None => {
if num_docs > 0 {
all_blocks.push(SourceBlock::Missing {
num_docs,
is_multi,
column_type,
});
}
}
}
}
let bytes_written = write_merged_column(
&mut *fast_writer,
field_id,
column_type,
is_multi,
total_docs,
&all_blocks,
)
.map_err(crate::Error::Io)?;
toc_entries.push(FastFieldTocEntry {
field_id,
column_type,
multi: is_multi,
data_offset: current_offset,
data_len: bytes_written,
num_docs: total_docs,
dict_offset: 0,
dict_count: 0,
});
current_offset += bytes_written;
}
let toc_offset = current_offset;
write_fast_field_toc_and_footer(&mut *fast_writer, toc_offset, &toc_entries)
.map_err(crate::Error::Io)?;
fast_writer.finish()?;
let total_bytes = toc_offset as usize + toc_entries.len() * 38 + 16;
log::info!(
"[merge] fast-fields: {} columns, {} docs, {} (raw block stacking)",
toc_entries.len(),
total_docs,
super::format_bytes(total_bytes)
);
Ok(total_bytes)
}
}
enum SourceBlock<'a> {
Raw {
num_docs: u32,
data: &'a [u8],
dict_count: u32,
dict_bytes: &'a [u8],
},
Missing {
num_docs: u32,
is_multi: bool,
column_type: FastFieldColumnType,
},
}
fn write_merged_column(
writer: &mut dyn Write,
_field_id: u32,
_column_type: FastFieldColumnType,
_is_multi: bool,
_total_docs: u32,
blocks: &[SourceBlock],
) -> std::io::Result<u64> {
let mut index_entries: Vec<BlockIndexEntry> = Vec::with_capacity(blocks.len());
let mut block_payloads: Vec<(Vec<u8>, Vec<u8>)> = Vec::with_capacity(blocks.len());
for block in blocks {
match block {
SourceBlock::Raw {
num_docs,
data,
dict_count,
dict_bytes,
} => {
index_entries.push(BlockIndexEntry {
num_docs: *num_docs,
data_len: data.len() as u32,
dict_count: *dict_count,
dict_len: dict_bytes.len() as u32,
});
block_payloads.push((Vec::new(), Vec::new())); }
SourceBlock::Missing {
num_docs,
is_multi: blk_multi,
column_type: blk_type,
} => {
let (data_buf, dict_buf, dict_count) =
synthesize_zero_block(*num_docs, *blk_multi, *blk_type)?;
index_entries.push(BlockIndexEntry {
num_docs: *num_docs,
data_len: data_buf.len() as u32,
dict_count,
dict_len: dict_buf.len() as u32,
});
block_payloads.push((data_buf, dict_buf));
}
}
}
let mut total = 0u64;
writer.write_u32::<LittleEndian>(blocks.len() as u32)?;
total += 4;
for entry in &index_entries {
entry.write_to(writer)?;
}
total += (blocks.len() * BLOCK_INDEX_ENTRY_SIZE) as u64;
for (i, block) in blocks.iter().enumerate() {
match block {
SourceBlock::Raw {
data, dict_bytes, ..
} => {
writer.write_all(data)?;
total += data.len() as u64;
writer.write_all(dict_bytes)?;
total += dict_bytes.len() as u64;
}
SourceBlock::Missing { .. } => {
let (ref data_buf, ref dict_buf) = block_payloads[i];
writer.write_all(data_buf)?;
total += data_buf.len() as u64;
writer.write_all(dict_buf)?;
total += dict_buf.len() as u64;
}
}
}
Ok(total)
}
fn synthesize_zero_block(
num_docs: u32,
is_multi: bool,
column_type: FastFieldColumnType,
) -> std::io::Result<(Vec<u8>, Vec<u8>, u32)> {
let mut writer = if is_multi {
match column_type {
FastFieldColumnType::TextOrdinal => FastFieldWriter::new_text_multi(),
_ => FastFieldWriter::new_numeric_multi(column_type),
}
} else {
match column_type {
FastFieldColumnType::TextOrdinal => FastFieldWriter::new_text(),
_ => FastFieldWriter::new_numeric(column_type),
}
};
writer.pad_to(num_docs);
let mut buf = Vec::new();
let (_toc, _total) = writer.serialize(&mut buf, 0)?;
if buf.len() < 4 + BLOCK_INDEX_ENTRY_SIZE {
return Ok((Vec::new(), Vec::new(), 0));
}
let mut cursor = std::io::Cursor::new(&buf[4..4 + BLOCK_INDEX_ENTRY_SIZE]);
let entry = BlockIndexEntry::read_from(&mut cursor)?;
let data_start = 4 + BLOCK_INDEX_ENTRY_SIZE;
let data_end = data_start + entry.data_len as usize;
let dict_end = data_end + entry.dict_len as usize;
let data_bytes = buf[data_start..data_end].to_vec();
let dict_bytes = if dict_end > data_end {
buf[data_end..dict_end].to_vec()
} else {
Vec::new()
};
Ok((data_bytes, dict_bytes, entry.dict_count))
}