use std::collections::{BTreeMap, HashMap};
use std::ops::RangeInclusive;
use std::path::Path;
use std::time::Instant;
use anyhow::Context;
use async_trait::async_trait;
use fail::fail_point;
use itertools::{izip, Itertools};
use quickwit_actors::{
Actor, ActorContext, ActorExitStatus, ActorRunner, Handler, Mailbox, QueueCapacity,
};
use quickwit_common::split_file;
use quickwit_directories::{BundleDirectory, UnionDirectory};
use quickwit_doc_mapper::QUICKWIT_TOKENIZER_MANAGER;
use quickwit_metastore::checkpoint::CheckpointDelta;
use quickwit_metastore::SplitMetadata;
use tantivy::directory::{DirectoryClone, MmapDirectory, RamDirectory};
use tantivy::fastfield::{DynamicFastFieldReader, FastFieldReader};
use tantivy::{
demux, DemuxMapping, Directory, DocIdToSegmentOrdinal, Index, IndexMeta, Segment, SegmentId,
SegmentReader, TantivyError,
};
use tracing::{debug, info, info_span, Span};
use crate::actors::Packager;
use crate::controlled_directory::ControlledDirectory;
use crate::merge_policy::MergeOperation;
use crate::models::{IndexedSplit, IndexedSplitBatch, MergeScratch, ScratchDirectory};
pub struct MergeExecutor {
index_id: String,
merge_packager_mailbox: Mailbox<Packager>,
timestamp_field_name: Option<String>,
demux_field_name: Option<String>,
min_demuxed_split_num_docs: usize,
max_demuxed_split_num_docs: usize,
}
#[async_trait]
impl Actor for MergeExecutor {
type ObservableState = ();
fn runner(&self) -> ActorRunner {
ActorRunner::DedicatedThread
}
fn observable_state(&self) -> Self::ObservableState {}
fn queue_capacity(&self) -> QueueCapacity {
QueueCapacity::Bounded(1)
}
fn name(&self) -> String {
"MergeExecutor".to_string()
}
}
#[async_trait]
impl Handler<MergeScratch> for MergeExecutor {
type Reply = ();
fn message_span(&self, msg_id: u64, merge_scratch: &MergeScratch) -> Span {
match &merge_scratch.merge_operation {
MergeOperation::Merge {
merge_split_id,
splits,
} => {
let num_docs: usize = splits.iter().map(|split| split.num_docs).sum();
let in_merge_split_ids: Vec<String> = splits
.iter()
.map(|split| split.split_id().to_string())
.collect();
info_span!("merge",
msg_id=&msg_id,
dir=%merge_scratch.merge_scratch_directory.path().display(),
merge_split_id=%merge_split_id,
in_merge_split_ids=?in_merge_split_ids,
num_docs=num_docs,
num_splits=splits.len())
}
MergeOperation::Demux {
demux_split_ids,
splits,
} => {
let num_docs: usize = splits.iter().map(|split| split.num_docs).sum();
let in_demux_split_idx: Vec<String> = splits
.iter()
.map(|split| split.split_id().to_string())
.collect();
info_span!("demux",
msg_id=&msg_id,
dir=%merge_scratch.merge_scratch_directory.path().display(),
demux_split_ids=?demux_split_ids,
in_demux_split_idx=?in_demux_split_idx,
num_docs=num_docs,
num_splits=splits.len())
}
}
}
async fn handle(
&mut self,
merge_scratch: MergeScratch,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
match merge_scratch.merge_operation {
MergeOperation::Merge {
merge_split_id: split_id,
splits,
} => {
self.process_merge(
split_id,
splits,
merge_scratch.tantivy_dirs,
merge_scratch.merge_scratch_directory,
ctx,
)
.await?;
}
MergeOperation::Demux {
demux_split_ids,
splits,
} => {
self.process_demux(
demux_split_ids,
splits,
merge_scratch.merge_scratch_directory,
merge_scratch.downloaded_splits_directory,
ctx,
)
.await?;
}
}
Ok(())
}
}
fn combine_index_meta(mut index_metas: Vec<IndexMeta>) -> anyhow::Result<IndexMeta> {
let mut union_index_meta = index_metas.pop().with_context(|| "Only one IndexMeta")?;
for index_meta in index_metas {
union_index_meta.segments.extend(index_meta.segments);
}
Ok(union_index_meta)
}
fn open_split_directories(
tantivy_dirs: &[Box<dyn Directory>],
) -> anyhow::Result<(IndexMeta, Vec<Box<dyn Directory>>)> {
let mut directories: Vec<Box<dyn Directory>> = Vec::new();
let mut index_metas = Vec::new();
for tantivy_dir in tantivy_dirs {
directories.push(tantivy_dir.clone());
let index_meta = open_index(tantivy_dir.clone())?.load_metas()?;
index_metas.push(index_meta);
}
let union_index_meta = combine_index_meta(index_metas)?;
Ok((union_index_meta, directories))
}
fn create_shadowing_meta_json_directory(index_meta: IndexMeta) -> anyhow::Result<RamDirectory> {
let union_index_meta_json = serde_json::to_string_pretty(&index_meta)?;
let ram_directory = RamDirectory::default();
ram_directory.atomic_write(Path::new("meta.json"), union_index_meta_json.as_bytes())?;
Ok(ram_directory)
}
fn merge_time_range(splits: &[SplitMetadata]) -> Option<RangeInclusive<i64>> {
splits
.iter()
.flat_map(|split| split.time_range.clone())
.flat_map(|time_range| vec![*time_range.start(), *time_range.end()].into_iter())
.minmax()
.into_option()
.map(|(min_timestamp, max_timestamp)| min_timestamp..=max_timestamp)
}
fn sum_doc_sizes_in_bytes(splits: &[SplitMetadata]) -> u64 {
splits
.iter()
.map(|split| split.original_size_in_bytes)
.sum::<u64>()
}
fn sum_num_docs(splits: &[SplitMetadata]) -> u64 {
splits.iter().map(|split| split.num_docs as u64).sum()
}
fn merge_all_segments(index: &Index) -> anyhow::Result<()> {
let segment_ids: Vec<SegmentId> = index
.searchable_segment_metas()?
.into_iter()
.map(|segment_meta| segment_meta.id())
.collect();
if segment_ids.len() <= 1 {
return Ok(());
}
debug!(segment_ids=?segment_ids,"merging-segments");
let mut index_writer = index.writer_with_num_threads(1, 10_000_000)?;
index_writer.merge(&segment_ids).wait()?;
Ok(())
}
fn merge_split_directories(
union_index_meta: IndexMeta,
split_directories: Vec<Box<dyn Directory>>,
output_path: &Path,
ctx: &ActorContext<MergeExecutor>,
) -> anyhow::Result<ControlledDirectory> {
let shadowing_meta_json_directory = create_shadowing_meta_json_directory(union_index_meta)?;
let output_directory = ControlledDirectory::new(
Box::new(MmapDirectory::open(output_path)?),
ctx.progress().clone(),
ctx.kill_switch().clone(),
);
let mut directory_stack: Vec<Box<dyn Directory>> = vec![
output_directory.box_clone(),
Box::new(shadowing_meta_json_directory),
];
directory_stack.extend(split_directories.into_iter());
let union_directory = UnionDirectory::union_of(directory_stack);
let union_index = open_index(union_directory)?;
ctx.record_progress();
let _protect_guard = ctx.protect_zone();
merge_all_segments(&union_index)?;
Ok(output_directory)
}
fn create_demux_output_directory(
directory_path: &Path,
ctx: &ActorContext<MergeExecutor>,
) -> tantivy::Result<ControlledDirectory> {
let mmap_directory = MmapDirectory::open(directory_path)?;
Ok(ControlledDirectory::new(
Box::new(mmap_directory),
ctx.progress().clone(),
ctx.kill_switch().clone(),
))
}
impl MergeExecutor {
pub fn new(
index_id: String,
merge_packager_mailbox: Mailbox<Packager>,
timestamp_field_name: Option<String>,
demux_field_name: Option<String>,
min_demuxed_split_num_docs: usize,
max_demuxed_split_num_docs: usize,
) -> Self {
MergeExecutor {
index_id,
merge_packager_mailbox,
timestamp_field_name,
demux_field_name,
min_demuxed_split_num_docs,
max_demuxed_split_num_docs,
}
}
async fn process_merge(
&mut self,
split_merge_id: String,
splits: Vec<SplitMetadata>,
tantivy_dirs: Vec<Box<dyn Directory>>,
merge_scratch_directory: ScratchDirectory,
ctx: &ActorContext<Self>,
) -> anyhow::Result<()> {
let start = Instant::now();
info!("merge-start");
let replaced_split_ids: Vec<String> = splits
.iter()
.map(|split| split.split_id().to_string())
.collect();
let (union_index_meta, split_directories) = open_split_directories(&tantivy_dirs)?;
fail_point!("before-merge-split");
let controlled_directory = merge_split_directories(
union_index_meta,
split_directories,
merge_scratch_directory.path(),
ctx,
)?;
fail_point!("after-merge-split");
info!(
elapsed_secs = start.elapsed().as_secs_f32(),
"merge-success"
);
let time_range = merge_time_range(&splits);
let docs_size_in_bytes = sum_doc_sizes_in_bytes(&splits);
let num_docs = sum_num_docs(&splits);
let merged_index = open_index(controlled_directory.clone())?;
ctx.record_progress();
let index_writer = merged_index.writer_with_num_threads(1, 3_000_000)?;
ctx.record_progress();
let indexed_split = IndexedSplit {
split_id: split_merge_id,
index_id: self.index_id.clone(),
replaced_split_ids,
time_range,
demux_num_ops: 0,
num_docs,
docs_size_in_bytes,
split_date_of_birth: Instant::now(),
checkpoint_delta: CheckpointDelta::default(), index: merged_index,
index_writer,
split_scratch_directory: merge_scratch_directory,
controlled_directory_opt: Some(controlled_directory),
};
ctx.send_message(
&self.merge_packager_mailbox,
IndexedSplitBatch {
splits: vec![indexed_split],
},
)
.await?;
Ok(())
}
async fn process_demux(
&mut self,
demux_split_ids: Vec<String>,
splits: Vec<SplitMetadata>,
merge_scratch_directory: ScratchDirectory,
downloaded_splits_directory: ScratchDirectory,
ctx: &ActorContext<Self>,
) -> anyhow::Result<()> {
let start = Instant::now();
info!("demux-start");
assert!(
self.demux_field_name.is_some(),
"`process_demux` cannot be called without a demux field."
);
let demux_field_name = self.demux_field_name.as_ref().unwrap();
let replaced_split_ids = splits
.iter()
.map(|split| split.split_id().to_string())
.collect_vec();
let (index_metas, replaced_segments) =
load_metas_and_segments(downloaded_splits_directory.path(), &replaced_split_ids)?;
ctx.record_progress();
info!("open-readers");
let (replaced_segments_num_docs, replaced_segments_demux_field_readers) =
demux_field_readers(&replaced_segments, demux_field_name, ctx)?;
let mut virtual_split_with_all_docs = VirtualSplit::new(BTreeMap::new());
for (demux_value_reader, num_docs) in replaced_segments_demux_field_readers
.iter()
.zip(replaced_segments_num_docs.iter())
{
for doc_id in 0..*num_docs {
virtual_split_with_all_docs.add_docs(demux_value_reader.get(doc_id as u32), 1);
}
ctx.record_progress();
}
info!("demux-virtual-split");
let demuxed_virtual_splits = demux_virtual_split(
virtual_split_with_all_docs,
self.min_demuxed_split_num_docs,
self.max_demuxed_split_num_docs,
demux_split_ids.len(),
);
ctx.record_progress();
info!("demux-build-mapping");
let demux_mapping = build_demux_mapping(
replaced_segments_num_docs,
replaced_segments_demux_field_readers,
demuxed_virtual_splits,
);
let demuxed_scratched_directories: Vec<ScratchDirectory> = (0..demux_split_ids.len())
.map(|idx| merge_scratch_directory.named_temp_child(format!("demux-split-{}", idx)))
.try_collect()?;
let demuxed_split_directories: Vec<ControlledDirectory> = demuxed_scratched_directories
.iter()
.map(|directory| create_demux_output_directory(directory.path(), ctx))
.try_collect()?;
ctx.record_progress();
let union_index_meta = combine_index_meta(index_metas)?;
let boxed_demuxed_split_directories: Vec<Box<dyn Directory>> = demuxed_split_directories
.iter()
.map(DirectoryClone::box_clone)
.collect();
info!("demux-tantivy");
let indexes = {
let _protect_guard = ctx.protect_zone();
demux(
&replaced_segments,
&demux_mapping,
union_index_meta.index_settings,
boxed_demuxed_split_directories,
)?
};
ctx.record_progress();
info!(elapsed_secs = start.elapsed().as_secs_f32(), "demux-stop");
let mut indexed_splits = Vec::new();
let total_docs_size_in_bytes = splits
.iter()
.map(|split| split.original_size_in_bytes)
.sum::<u64>();
let total_num_docs = sum_num_docs(&splits);
let initial_demux_num_ops = splits
.iter()
.map(|split| split.demux_num_ops)
.max()
.unwrap();
for (split_id, index, scratched_directory, controlled_directory) in izip!(
demux_split_ids,
indexes,
demuxed_scratched_directories,
demuxed_split_directories
) {
let searchable_segments = index.searchable_segments()?;
assert_eq!(
searchable_segments.len(),
1,
"Demux should output indexes with only one segment."
);
let segment = searchable_segments.into_iter().next().unwrap();
let segment_reader = SegmentReader::open(&segment)?;
let num_docs = segment_reader.num_docs() as usize;
let docs_size_in_bytes =
(num_docs as f32 * total_docs_size_in_bytes as f32 / total_num_docs as f32) as u64;
let time_range = if let Some(ref timestamp_field_name) = self.timestamp_field_name {
let timestamp_field = segment_reader
.schema()
.get_field(timestamp_field_name)
.ok_or_else(|| TantivyError::SchemaError("Field does not exist".to_owned()))?;
let reader = segment_reader.fast_fields().i64(timestamp_field)?;
Some(RangeInclusive::new(reader.min_value(), reader.max_value()))
} else {
None
};
let index_writer = index.writer_with_num_threads(1, 3_000_000)?;
let indexed_split = IndexedSplit {
split_id,
index_id: self.index_id.clone(),
replaced_split_ids: replaced_split_ids.clone(),
time_range,
demux_num_ops: initial_demux_num_ops + 1,
num_docs: num_docs as u64,
docs_size_in_bytes,
split_date_of_birth: Instant::now(),
checkpoint_delta: CheckpointDelta::default(), index,
index_writer,
split_scratch_directory: scratched_directory,
controlled_directory_opt: Some(controlled_directory),
};
indexed_splits.push(indexed_split);
ctx.record_progress();
}
assert_eq!(
splits.iter().map(|split| split.num_docs).sum::<usize>() as u64,
indexed_splits
.iter()
.map(|split| split.num_docs)
.sum::<u64>()
);
ctx.send_message(
&self.merge_packager_mailbox,
IndexedSplitBatch {
splits: indexed_splits,
},
)
.await?;
Ok(())
}
}
fn open_index<T: Into<Box<dyn Directory>>>(directory: T) -> tantivy::Result<Index> {
let mut index = Index::open(directory)?;
index.set_tokenizers(QUICKWIT_TOKENIZER_MANAGER.clone());
Ok(index)
}
pub fn load_metas_and_segments(
directory_path: &Path,
split_ids: &[String],
) -> anyhow::Result<(Vec<IndexMeta>, Vec<Segment>)> {
let mmap_directory = MmapDirectory::open(directory_path)?;
let mut replaced_segments = Vec::new();
let mut index_metas = Vec::new();
for split_id in split_ids.iter() {
let split_filename = split_file(split_id);
let split_fileslice = mmap_directory.open_read(Path::new(&split_filename))?;
let split_directory = BundleDirectory::open_split(split_fileslice)?;
let index = open_index(split_directory)?;
index_metas.push(index.load_metas()?);
let searchable_segments = index.searchable_segments()?;
assert_eq!(
searchable_segments.len(),
1,
"Only one segment is expected for a split that is going to be demuxed."
);
let segment = searchable_segments.into_iter().next().unwrap();
replaced_segments.push(segment);
}
Ok((index_metas, replaced_segments))
}
pub fn demux_field_readers(
segments: &[Segment],
demux_field_name: &str,
ctx: &ActorContext<MergeExecutor>,
) -> anyhow::Result<(Vec<usize>, Vec<DynamicFastFieldReader<u64>>)> {
let mut segments_num_docs = Vec::new();
let mut segments_demux_value_readers = Vec::new();
for segment in segments {
let segment_reader = SegmentReader::open(segment)?;
segments_num_docs.push(segment_reader.num_docs() as usize);
let field = segment_reader
.schema()
.get_field(demux_field_name)
.ok_or_else(|| TantivyError::SchemaError("Field does not exist".to_owned()))?;
let reader = segment_reader.fast_fields().u64_lenient(field)?;
segments_demux_value_readers.push(reader);
ctx.record_progress();
}
Ok((segments_num_docs, segments_demux_value_readers))
}
pub fn build_demux_mapping(
replaced_segments_num_docs: Vec<usize>,
segments_demux_value_readers: Vec<DynamicFastFieldReader<u64>>,
target_demuxed_splits: Vec<VirtualSplit>,
) -> DemuxMapping {
assert_eq!(
replaced_segments_num_docs.iter().sum::<usize>(),
target_demuxed_splits
.iter()
.map(|split| split.total_num_docs())
.sum::<usize>(),
"Total num docs must be equal between input segments and targeted demuxed splits."
);
let mut num_docs_segment_stocks_by_demux_value = HashMap::new();
for (ordinal, split_map) in target_demuxed_splits.iter().enumerate() {
for (demux_value, &num_docs) in split_map.0.iter() {
num_docs_segment_stocks_by_demux_value
.entry(*demux_value)
.or_insert_with(Vec::new)
.push(SegmentNumDocs {
ordinal: ordinal as u32,
num_docs,
});
}
}
let mut mapping = DemuxMapping::default();
for (segment_demux_value_reader, num_docs) in segments_demux_value_readers
.iter()
.zip(replaced_segments_num_docs.iter())
{
let mut doc_id_to_segment_ordinal = DocIdToSegmentOrdinal::with_max_doc(*num_docs as usize);
for doc_id in 0..*num_docs {
let demux_value = segment_demux_value_reader.get(doc_id as u32);
let segment_num_docs_vec = num_docs_segment_stocks_by_demux_value
.get_mut(&demux_value)
.expect("Demux value must be present.");
segment_num_docs_vec[0].num_docs -= 1;
doc_id_to_segment_ordinal.set(doc_id as u32, segment_num_docs_vec[0].ordinal);
if segment_num_docs_vec[0].num_docs == 0 {
segment_num_docs_vec.remove(0);
}
}
mapping.add(doc_id_to_segment_ordinal);
}
assert!(
num_docs_segment_stocks_by_demux_value
.values()
.all(|stocks| stocks.is_empty()),
"All docs must be placed in new segments."
);
mapping
}
#[derive(Debug, Clone)]
pub struct VirtualSplit(BTreeMap<u64, usize>);
impl VirtualSplit {
pub fn new(map: BTreeMap<u64, usize>) -> Self {
Self(map)
}
pub fn total_num_docs(&self) -> usize {
self.0.values().sum()
}
pub fn sorted_demux_values(&self) -> Vec<u64> {
self.0.keys().cloned().collect_vec()
}
pub fn remove_docs(&mut self, demux_value: &u64, num_docs: usize) {
*self
.0
.get_mut(demux_value)
.expect("Cannot remove docs from a missing demux value") -= num_docs;
}
pub fn add_docs(&mut self, demux_value: u64, num_docs: usize) {
*self.0.entry(demux_value).or_insert(0) += num_docs;
}
pub fn num_docs(&self, demux_value: u64) -> usize {
*self.0.get(&demux_value).unwrap_or(&0usize)
}
}
struct SegmentNumDocs {
ordinal: u32,
num_docs: usize,
}
pub(crate) fn demux_virtual_split(
mut input_split: VirtualSplit,
min_split_num_docs: usize,
max_split_num_docs: usize,
output_num_splits: usize,
) -> Vec<VirtualSplit> {
let total_num_docs = input_split.total_num_docs();
assert!(
max_split_num_docs * output_num_splits >= total_num_docs,
"Input split num docs must be `<= max_split_num_docs * output_num_splits`."
);
assert!(
min_split_num_docs * output_num_splits <= total_num_docs,
"Input split num docs must be `>= min_split_num_docs * output_num_splits`."
);
let input_split_demux_values = input_split.sorted_demux_values();
let mut demuxed_splits = Vec::new();
let mut current_split = VirtualSplit::new(BTreeMap::new());
let mut num_docs_split_bounds = compute_current_split_bounds(
total_num_docs,
output_num_splits - 1,
min_split_num_docs,
max_split_num_docs,
);
for demux_value in input_split_demux_values.into_iter() {
while input_split.num_docs(demux_value) > 0 {
let num_docs_to_add = if current_split.total_num_docs()
+ input_split.num_docs(demux_value)
<= *num_docs_split_bounds.end()
{
input_split.num_docs(demux_value)
} else {
num_docs_split_bounds.end() - current_split.total_num_docs()
};
current_split.add_docs(demux_value, num_docs_to_add);
input_split.remove_docs(&demux_value, num_docs_to_add);
if current_split.total_num_docs() >= *num_docs_split_bounds.start() {
demuxed_splits.push(current_split.clone());
current_split.0.clear();
if output_num_splits - demuxed_splits.len() == 0 {
break;
}
num_docs_split_bounds = compute_current_split_bounds(
input_split.total_num_docs(),
output_num_splits - demuxed_splits.len() - 1,
min_split_num_docs,
max_split_num_docs,
);
}
}
}
assert_eq!(
demuxed_splits
.iter()
.map(|split| split.total_num_docs())
.sum::<usize>(),
total_num_docs,
"Demuxing must keep the same number of docs."
);
assert!(
demuxed_splits
.iter()
.map(|split| split.total_num_docs())
.min()
.unwrap_or(min_split_num_docs)
>= min_split_num_docs,
"Demuxing must satisfy the min contraint on split num docs."
);
assert!(
demuxed_splits
.iter()
.map(|split| split.total_num_docs())
.max()
.unwrap_or(max_split_num_docs)
<= max_split_num_docs,
"Demuxing must satisfy the max contraint on split num docs."
);
assert!(
demuxed_splits.len() == output_num_splits,
"Demuxing must return exactly the requested output splits number."
);
demuxed_splits
}
pub fn compute_current_split_bounds(
remaining_num_docs: usize,
remaining_num_splits: usize,
min_split_num_docs: usize,
max_split_num_docs: usize,
) -> RangeInclusive<usize> {
if remaining_num_splits == 0 {
return RangeInclusive::new(remaining_num_docs, max_split_num_docs);
}
let num_docs_lower_bound = if remaining_num_docs > remaining_num_splits * max_split_num_docs {
std::cmp::max(
min_split_num_docs,
remaining_num_docs - remaining_num_splits * max_split_num_docs,
)
} else {
min_split_num_docs
};
let num_docs_upper_bound = std::cmp::min(
max_split_num_docs,
remaining_num_docs - remaining_num_splits * min_split_num_docs,
);
assert!(
num_docs_lower_bound <= num_docs_upper_bound,
"Num docs lower bound must be <= num docs upper bound."
);
RangeInclusive::new(num_docs_lower_bound, num_docs_upper_bound)
}
#[cfg(test)]
mod tests {
use std::mem;
use quickwit_actors::{create_test_mailbox, Universe};
use quickwit_common::split_file;
use quickwit_metastore::SplitMetadata;
use super::*;
use crate::merge_policy::MergeOperation;
use crate::models::ScratchDirectory;
use crate::{get_tantivy_directory_from_split_bundle, new_split_id, TestSandbox};
#[tokio::test]
async fn test_merge_executor() -> anyhow::Result<()> {
quickwit_common::setup_logging_for_tests();
let index_id = "test-index";
let doc_mapping_yaml = r#"
field_mappings:
- name: body
type: text
- name: ts
type: i64
fast: true
"#;
let test_sandbox = TestSandbox::create(index_id, doc_mapping_yaml, "{}", &["body"]).await?;
for split_id in 0..4 {
let docs = vec![
serde_json::json!({"body ": format!("split{}", split_id), "ts": 1631072713u64 + split_id }),
];
test_sandbox.add_documents(docs).await?;
}
let metastore = test_sandbox.metastore();
let split_metas: Vec<SplitMetadata> = metastore
.list_all_splits(index_id)
.await?
.into_iter()
.map(|split| split.split_metadata)
.collect();
assert_eq!(split_metas.len(), 4);
let merge_scratch_directory = ScratchDirectory::for_test()?;
let downloaded_splits_directory =
merge_scratch_directory.named_temp_child("downloaded-splits-")?;
let mut tantivy_dirs: Vec<Box<dyn Directory>> = vec![];
for split_meta in &split_metas {
let split_filename = split_file(split_meta.split_id());
let dest_filepath = downloaded_splits_directory.path().join(&split_filename);
test_sandbox
.storage()
.copy_to_file(Path::new(&split_filename), &dest_filepath)
.await?;
tantivy_dirs.push(get_tantivy_directory_from_split_bundle(&dest_filepath).unwrap())
}
let merge_scratch = MergeScratch {
merge_operation: MergeOperation::Merge {
merge_split_id: crate::new_split_id(),
splits: split_metas,
},
tantivy_dirs,
merge_scratch_directory,
downloaded_splits_directory,
};
let (merge_packager_mailbox, merge_packager_inbox) = create_test_mailbox();
let merge_executor = MergeExecutor::new(
index_id.to_string(),
merge_packager_mailbox,
None,
None,
10_000_000,
20_000_000,
);
let universe = Universe::new();
let (merge_executor_mailbox, merge_executor_handle) =
universe.spawn_actor(merge_executor).spawn();
merge_executor_mailbox.send_message(merge_scratch).await?;
merge_executor_handle.process_pending_and_observe().await;
let mut packager_msgs = merge_packager_inbox.drain_for_test();
assert_eq!(packager_msgs.len(), 1);
let packager_msg = packager_msgs
.pop()
.unwrap()
.downcast::<IndexedSplitBatch>()
.unwrap();
assert_eq!(packager_msg.splits[0].num_docs, 4);
assert_eq!(packager_msg.splits[0].docs_size_in_bytes, 136);
let reader = packager_msg.splits[0].index.reader()?;
let searcher = reader.searcher();
assert_eq!(searcher.segment_readers().len(), 1);
Ok(())
}
#[tokio::test]
async fn test_demux_execution() -> anyhow::Result<()> {
quickwit_common::setup_logging_for_tests();
let index_id = "test-index-demux";
let doc_mapping_yaml = r#"
field_mappings:
- name: body
type: text
- name: ts
type: i64
fast: true
- name: tenant_id
type: u64
fast: true
tag_fields: [tenant_id]
"#;
let indexing_settings_yaml = r#"
demux_field: tenant_id
timestamp_field: ts
"#;
let test_sandbox = TestSandbox::create(
index_id,
doc_mapping_yaml,
indexing_settings_yaml,
&["body"],
)
.await?;
let mut last_tenant_min_timestamp = 0i64;
let mut last_tenant_max_timestamp = 0i64;
for split_id in 0..4 {
let last_tenant_timestamp: i64 = 1631072713 + (1 + split_id) * 20;
let docs = vec![
serde_json::json!({"body ": format!("split{}", split_id), "ts": 1631072713i64 + split_id, "tenant_id": 10u64 }),
serde_json::json!({"body ": format!("split{}", split_id), "ts": 1631072713i64 + (1 + split_id) * 10, "tenant_id": 11u64 }),
serde_json::json!({"body ": format!("split{}", split_id), "ts": last_tenant_timestamp, "tenant_id": 12u64 }),
];
if split_id == 0 {
last_tenant_min_timestamp = last_tenant_timestamp;
}
if split_id == 3 {
last_tenant_max_timestamp = last_tenant_timestamp;
}
test_sandbox.add_documents(docs).await?;
}
let metastore = test_sandbox.metastore();
let split_metas: Vec<SplitMetadata> = metastore
.list_all_splits(index_id)
.await?
.into_iter()
.map(|split| split.split_metadata)
.collect();
let demux_split_ids = (0..split_metas.len() - 1)
.map(|_| new_split_id())
.collect_vec();
let total_num_bytes_docs = split_metas
.iter()
.map(|meta| meta.original_size_in_bytes)
.sum::<u64>();
let merge_scratch_directory = ScratchDirectory::for_test()?;
let downloaded_splits_directory =
merge_scratch_directory.named_temp_child("downloaded-splits")?;
for split_meta in &split_metas {
let split_filename = split_file(split_meta.split_id());
let dest_filepath = downloaded_splits_directory.path().join(&split_filename);
test_sandbox
.storage()
.copy_to_file(Path::new(&split_filename), &dest_filepath)
.await?;
}
let merge_scratch = MergeScratch {
merge_operation: MergeOperation::Demux {
splits: split_metas,
demux_split_ids,
},
merge_scratch_directory,
downloaded_splits_directory,
tantivy_dirs: Default::default(),
};
let (merge_packager_mailbox, merge_packager_inbox) = create_test_mailbox();
let merge_executor = MergeExecutor::new(
index_id.to_string(),
merge_packager_mailbox,
Some("ts".to_string()),
Some("tenant_id".to_string()),
2,
5,
);
let universe = Universe::new();
let (merge_executor_mailbox, merge_executor_handle) =
universe.spawn_actor(merge_executor).spawn();
merge_executor_mailbox.send_message(merge_scratch).await?;
mem::drop(merge_executor_mailbox);
let _ = merge_executor_handle.join().await;
let mut packager_msgs = merge_packager_inbox.drain_for_test();
assert_eq!(packager_msgs.len(), 1);
let mut splits = packager_msgs
.pop()
.unwrap()
.downcast::<IndexedSplitBatch>()
.unwrap()
.splits;
assert_eq!(splits.len(), 3);
let total_num_docs: u64 = splits.iter().map(|split| split.num_docs).sum();
assert_eq!(total_num_docs, 12);
let first_index_split = splits.first().unwrap();
assert_eq!(first_index_split.num_docs, 4);
let last_indexed_split = splits.pop().unwrap();
assert_eq!(
last_indexed_split.time_range.unwrap(),
last_tenant_min_timestamp..=last_tenant_max_timestamp
);
assert_eq!(last_indexed_split.num_docs, 4);
assert_eq!(
last_indexed_split.docs_size_in_bytes,
total_num_bytes_docs / 3
);
assert_eq!(last_indexed_split.demux_num_ops, 1);
let reader = last_indexed_split.index.reader()?;
let searcher = reader.searcher();
assert_eq!(searcher.segment_readers().len(), 1);
Ok(())
}
#[test]
fn test_demux_with_same_num_docs() {
let mut num_docs_map = BTreeMap::new();
num_docs_map.insert(0, 100);
num_docs_map.insert(1, 100);
num_docs_map.insert(2, 100);
let splits = demux_virtual_split(VirtualSplit::new(num_docs_map), 100, 200, 3);
assert_eq!(splits.len(), 3);
assert_eq!(splits[0].num_docs(0), 100);
assert_eq!(splits[1].num_docs(1), 100);
assert_eq!(splits[2].num_docs(2), 100);
}
#[test]
fn test_demux_distribution_with_huge_diff_in_num_docs() {
let mut num_docs_map = BTreeMap::new();
num_docs_map.insert(0, 1);
num_docs_map.insert(1, 200);
num_docs_map.insert(2, 200);
num_docs_map.insert(3, 1);
let splits = demux_virtual_split(VirtualSplit::new(num_docs_map), 100, 200, 3);
assert_eq!(splits.len(), 3);
assert_eq!(splits[0].num_docs(0), 1);
assert_eq!(splits[0].num_docs(1), 199);
assert_eq!(splits[1].num_docs(1), 1);
assert_eq!(splits[1].num_docs(2), 101);
assert_eq!(splits[2].num_docs(2), 99);
assert_eq!(splits[2].num_docs(3), 1);
}
#[test]
fn test_demux_not_cutting_tenants_docs_into_two_splits_thanks_to_nice_min_max() {
let mut num_docs_map = BTreeMap::new();
num_docs_map.insert(0, 1);
num_docs_map.insert(1, 50);
num_docs_map.insert(2, 75);
num_docs_map.insert(3, 100);
num_docs_map.insert(4, 50);
num_docs_map.insert(5, 150);
let splits = demux_virtual_split(VirtualSplit::new(num_docs_map), 100, 200, 3);
assert_eq!(splits.len(), 3);
assert_eq!(splits[0].num_docs(0), 1);
assert_eq!(splits[0].num_docs(1), 50);
assert_eq!(splits[0].num_docs(2), 75);
assert_eq!(splits[1].num_docs(3), 100);
assert_eq!(splits[2].num_docs(4), 50);
assert_eq!(splits[2].num_docs(5), 150);
}
#[test]
fn test_demux_should_not_cut_tenant_with_small_tenants_with_same_num_docs() {
let mut num_docs_map = BTreeMap::new();
for i in 0..1000 {
num_docs_map.insert(i as u64, 20_001);
}
let splits =
demux_virtual_split(VirtualSplit::new(num_docs_map), 10_000_000, 20_000_000, 2);
assert_eq!(splits.len(), 2);
assert_eq!(splits[0].total_num_docs(), 10_000_500);
assert_eq!(splits[1].total_num_docs(), 10_000_500);
}
#[test]
fn test_demux_should_cut_one_huge_tenant_into_all_splits() {
let mut num_docs_map = BTreeMap::new();
num_docs_map.insert(0, 30_000_001);
let splits =
demux_virtual_split(VirtualSplit::new(num_docs_map), 10_000_000, 20_000_000, 3);
assert_eq!(splits.len(), 3);
assert_eq!(splits[0].total_num_docs(), 10_000_001);
assert_eq!(splits[1].total_num_docs(), 10_000_000);
assert_eq!(splits[2].total_num_docs(), 10_000_000);
}
#[test]
#[should_panic(
expected = "Input split num docs must be `<= max_split_num_docs * output_num_splits`."
)]
fn test_demux_should_panic_when_one_split_has_too_many_docs() {
let mut num_docs_map = BTreeMap::new();
num_docs_map.insert(0, 1);
num_docs_map.insert(1, 201);
num_docs_map.insert(2, 201);
num_docs_map.insert(3, 201);
demux_virtual_split(VirtualSplit::new(num_docs_map), 100, 200, 3);
}
use proptest::prelude::*;
proptest! {
#![proptest_config(ProptestConfig::with_cases(100))]
#[test]
fn test_proptest_simulate_demux_with_huge_tenants(tenants_num_docs in gen_tenants_docs()) {
test_demux_aux(&tenants_num_docs[..]);
}
}
fn test_demux_aux(tenants_num_docs: &[usize]) {
let total_num_docs = tenants_num_docs.iter().sum::<usize>();
let output_num_splits = total_num_docs / 5_000_000;
let mut num_docs_map = BTreeMap::new();
for (i, num_docs) in tenants_num_docs.iter().enumerate() {
num_docs_map.insert(i as u64, *num_docs);
}
let splits = demux_virtual_split(
VirtualSplit::new(num_docs_map),
5_000_000,
15_000_000,
output_num_splits,
);
let tenant_count_per_split_mean = splits
.iter()
.map(|split| split.sorted_demux_values().len())
.sum::<usize>()
/ output_num_splits;
assert!(tenant_count_per_split_mean >= tenants_num_docs.len() / output_num_splits);
assert!(tenant_count_per_split_mean <= tenants_num_docs.len() / output_num_splits + 1);
}
fn gen_tenants_docs() -> BoxedStrategy<Vec<usize>> {
(100..1_000_usize)
.prop_flat_map(move |num_tenants| {
proptest::collection::vec(100_000..5_000_000_usize, num_tenants)
})
.boxed()
}
}