use std::collections::BTreeSet;
use std::io;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use anyhow::{bail, Context};
use async_trait::async_trait;
use fail::fail_point;
use itertools::Itertools;
use quickwit_actors::{
Actor, ActorContext, ActorExitStatus, ActorRunner, Handler, Mailbox, QueueCapacity,
};
use quickwit_directories::write_hotcache;
use quickwit_doc_mapper::tag_pruning::append_to_tag_set;
use tantivy::schema::FieldType;
use tantivy::{InvertedIndexReader, ReloadPolicy, SegmentId, SegmentMeta};
use tracing::{debug, info, info_span, warn, Span};
const MAX_VALUES_PER_TAG_FIELD: usize = if cfg!(any(test, feature = "testsuite")) {
6
} else {
1000
};
use super::NamedField;
use crate::actors::Uploader;
use crate::models::{
IndexedSplit, IndexedSplitBatch, PackagedSplit, PackagedSplitBatch, ScratchDirectory,
};
pub struct Packager {
actor_name: &'static str,
uploader_mailbox: Mailbox<Uploader>,
tag_fields: Vec<NamedField>,
}
impl Packager {
pub fn new(
actor_name: &'static str,
tag_fields: Vec<NamedField>,
uploader_mailbox: Mailbox<Uploader>,
) -> Packager {
Packager {
actor_name,
uploader_mailbox,
tag_fields,
}
}
pub async fn process_indexed_split(
&self,
mut split: IndexedSplit,
ctx: &ActorContext<Self>,
) -> anyhow::Result<PackagedSplit> {
commit_split(&mut split, ctx)?;
let segment_metas = merge_segments_if_required(&mut split, ctx).await?;
let packaged_split =
create_packaged_split(&segment_metas[..], split, &self.tag_fields, ctx)?;
Ok(packaged_split)
}
}
#[async_trait]
impl Actor for Packager {
type ObservableState = ();
#[allow(clippy::unused_unit)]
fn observable_state(&self) -> Self::ObservableState {
()
}
fn queue_capacity(&self) -> QueueCapacity {
QueueCapacity::Bounded(0)
}
fn name(&self) -> String {
self.actor_name.to_string()
}
fn runner(&self) -> ActorRunner {
ActorRunner::DedicatedThread
}
}
#[async_trait]
impl Handler<IndexedSplitBatch> for Packager {
type Reply = ();
fn message_span(&self, msg_id: u64, batch: &IndexedSplitBatch) -> Span {
info_span!("", msg_id=&msg_id, num_splits=%batch.splits.len())
}
async fn handle(
&mut self,
batch: IndexedSplitBatch,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
info!(split_ids=?batch.splits.iter().map(|split| split.split_id.clone()).collect_vec(), "start-packaging-splits");
for split in &batch.splits {
if let Some(controlled_directory) = split.controlled_directory_opt.as_ref() {
controlled_directory.set_progress_and_kill_switch(
ctx.progress().clone(),
ctx.kill_switch().clone(),
);
}
}
fail_point!("packager:before");
let mut packaged_splits = Vec::new();
for split in batch.splits {
let packaged_split = self.process_indexed_split(split, ctx).await?;
packaged_splits.push(packaged_split);
}
ctx.send_message(
&self.uploader_mailbox,
PackagedSplitBatch::new(packaged_splits),
)
.await?;
fail_point!("packager:after");
Ok(())
}
}
fn is_merge_required(segment_metas: &[SegmentMeta]) -> bool {
match &segment_metas {
[] => false,
[segment_meta] => segment_meta.has_deletes(),
_ => true,
}
}
fn commit_split(split: &mut IndexedSplit, ctx: &ActorContext<Packager>) -> anyhow::Result<()> {
info!(split_id=%split.split_id, "commit-split");
let _protect_guard = ctx.protect_zone();
split
.index_writer
.commit()
.with_context(|| format!("Commit split `{:?}` failed", &split))?;
Ok(())
}
fn list_split_files(
segment_metas: &[SegmentMeta],
scratch_directory: &ScratchDirectory,
) -> Vec<PathBuf> {
let mut index_files = vec![scratch_directory.path().join("meta.json")];
for segment_meta in segment_metas {
for relative_path in segment_meta.list_files() {
let filepath = scratch_directory.path().join(&relative_path);
if filepath.exists() {
index_files.push(filepath);
}
}
}
index_files.sort();
index_files
}
async fn merge_segments_if_required(
split: &mut IndexedSplit,
ctx: &ActorContext<Packager>,
) -> anyhow::Result<Vec<SegmentMeta>> {
debug!(
split_id = split.split_id.as_str(),
"merge-segments-if-required"
);
let segment_metas_before_merge = split.index.searchable_segment_metas()?;
if is_merge_required(&segment_metas_before_merge[..]) {
let segment_ids: Vec<SegmentId> = segment_metas_before_merge
.into_iter()
.map(|segment_meta| segment_meta.id())
.collect();
info!(split_id=split.split_id.as_str(), segment_ids=?segment_ids, "merging-segments");
let _protected_zone_guard = ctx.protect_zone();
split.index_writer.merge(&segment_ids).await?;
}
let segment_metas_after_merge: Vec<SegmentMeta> = split.index.searchable_segment_metas()?;
Ok(segment_metas_after_merge)
}
fn build_hotcache<W: io::Write>(split_path: &Path, out: &mut W) -> anyhow::Result<()> {
let mmap_directory = tantivy::directory::MmapDirectory::open(split_path)?;
write_hotcache(mmap_directory, out)?;
Ok(())
}
fn try_extract_terms(
named_field: &NamedField,
inv_indexes: &[Arc<InvertedIndexReader>],
max_terms: usize,
) -> anyhow::Result<Vec<String>> {
let num_terms = inv_indexes
.iter()
.map(|inv_index| inv_index.terms().num_terms())
.sum::<usize>();
if num_terms > max_terms {
bail!(
"Number of unique terms for tag field {} > {}.",
named_field.name,
max_terms
);
}
let mut terms = Vec::with_capacity(num_terms);
for inv_index in inv_indexes {
let mut terms_streamer = inv_index.terms().stream()?;
while let Some((term_data, _)) = terms_streamer.next() {
let term = match named_field.field_type {
FieldType::U64(_) => u64_from_term_data(term_data)?.to_string(),
FieldType::I64(_) => {
tantivy::u64_to_i64(u64_from_term_data(term_data)?).to_string()
}
FieldType::F64(_) => {
tantivy::u64_to_f64(u64_from_term_data(term_data)?).to_string()
}
FieldType::Bytes(_) => {
bail!("Tags collection is not allowed on `bytes` fields.")
}
_ => std::str::from_utf8(term_data)?.to_string(),
};
terms.push(term);
}
}
Ok(terms)
}
fn create_packaged_split(
segment_metas: &[SegmentMeta],
split: IndexedSplit,
tag_fields: &[NamedField],
ctx: &ActorContext<Packager>,
) -> anyhow::Result<PackagedSplit> {
info!(split_id = split.split_id.as_str(), "create-packaged-split");
let split_files = list_split_files(segment_metas, &split.split_scratch_directory);
let num_docs = segment_metas
.iter()
.map(|segment_meta| segment_meta.num_docs() as u64)
.sum();
debug!(split_id = split.split_id.as_str(), tag_fields =? tag_fields, "extract-tags-values");
let index_reader = split
.index
.reader_builder()
.reload_policy(ReloadPolicy::Manual)
.try_into()?;
let mut tags = BTreeSet::default();
for named_field in tag_fields {
let inverted_indexes = index_reader
.searcher()
.segment_readers()
.iter()
.map(|segment| segment.inverted_index(named_field.field))
.collect::<Result<Vec<_>, _>>()?;
match try_extract_terms(named_field, &inverted_indexes, MAX_VALUES_PER_TAG_FIELD) {
Ok(terms) => {
append_to_tag_set(&named_field.name, &terms, &mut tags);
}
Err(tag_extraction_error) => {
warn!(err=?tag_extraction_error, "No field values will be registered in the split metadata.");
}
}
}
ctx.record_progress();
debug!(split_id = split.split_id.as_str(), "build-hotcache");
let mut hotcache_bytes = vec![];
build_hotcache(split.split_scratch_directory.path(), &mut hotcache_bytes)?;
ctx.record_progress();
let packaged_split = PackagedSplit {
split_id: split.split_id.to_string(),
replaced_split_ids: split.replaced_split_ids,
index_id: split.index_id,
checkpoint_deltas: vec![split.checkpoint_delta],
split_scratch_directory: split.split_scratch_directory,
num_docs,
demux_num_ops: split.demux_num_ops,
time_range: split.time_range,
size_in_bytes: split.docs_size_in_bytes,
tags,
split_date_of_birth: split.split_date_of_birth,
split_files,
hotcache_bytes,
};
Ok(packaged_split)
}
fn u64_from_term_data(data: &[u8]) -> anyhow::Result<u64> {
let u64_bytes: [u8; 8] = data[0..8]
.try_into()
.context("Could not interpret term bytes as u64")?;
Ok(u64::from_be_bytes(u64_bytes))
}
#[cfg(test)]
mod tests {
use std::ops::RangeInclusive;
use std::time::Instant;
use quickwit_actors::{create_test_mailbox, ObservationType, Universe};
use quickwit_doc_mapper::QUICKWIT_TOKENIZER_MANAGER;
use quickwit_metastore::checkpoint::CheckpointDelta;
use tantivy::schema::{NumericOptions, Schema, FAST, STRING, TEXT};
use tantivy::{doc, Index};
use super::*;
use crate::models::ScratchDirectory;
fn make_indexed_split_for_test(segments_timestamps: &[&[i64]]) -> anyhow::Result<IndexedSplit> {
let split_scratch_directory = ScratchDirectory::for_test()?;
let mut schema_builder = Schema::builder();
let text_field = schema_builder.add_text_field("text", TEXT);
let timestamp_field = schema_builder.add_u64_field("timestamp", FAST);
let tag_str = schema_builder.add_text_field("tag_str", STRING);
let tag_many = schema_builder.add_text_field("tag_many", STRING);
let tag_u64 =
schema_builder.add_u64_field("tag_u64", NumericOptions::default().set_indexed());
let tag_i64 =
schema_builder.add_i64_field("tag_i64", NumericOptions::default().set_indexed());
let tag_f64 =
schema_builder.add_f64_field("tag_f64", NumericOptions::default().set_indexed());
let schema = schema_builder.build();
let mut index = Index::create_in_dir(split_scratch_directory.path(), schema)?;
index.set_tokenizers(QUICKWIT_TOKENIZER_MANAGER.clone());
let mut index_writer = index.writer_with_num_threads(1, 10_000_000)?;
let mut timerange_opt: Option<RangeInclusive<i64>> = None;
let mut num_docs = 0;
for (segment_num, segment_timestamps) in segments_timestamps.iter().enumerate() {
if segment_num > 0 {
index_writer.commit()?;
}
for ×tamp in segment_timestamps.iter() {
for num in 1..10 {
let doc = doc!(
text_field => format!("timestamp is {}", timestamp),
timestamp_field => timestamp,
tag_str => "value",
tag_many => format!("many-{}", num),
tag_u64 => 42u64,
tag_i64 => -42i64,
tag_f64 => -42.02f64,
);
index_writer.add_document(doc)?;
num_docs += 1;
timerange_opt = Some(
timerange_opt
.map(|timestamp_range| {
let start = timestamp.min(*timestamp_range.start());
let end = timestamp.max(*timestamp_range.end());
RangeInclusive::new(start, end)
})
.unwrap_or_else(|| RangeInclusive::new(timestamp, timestamp)),
)
}
}
}
let indexed_split = IndexedSplit {
split_id: "test-split".to_string(),
index_id: "test-index".to_string(),
time_range: timerange_opt,
demux_num_ops: 0,
num_docs,
docs_size_in_bytes: num_docs * 15, split_date_of_birth: Instant::now(),
index,
index_writer,
split_scratch_directory,
checkpoint_delta: CheckpointDelta::from(10..20),
replaced_split_ids: Vec::new(),
controlled_directory_opt: None,
};
Ok(indexed_split)
}
fn get_tag_fields(schema: Schema, field_names: &[&str]) -> Vec<NamedField> {
field_names
.iter()
.map(|field_name| {
let field = schema.get_field(field_name).unwrap();
let field_type = schema.get_field_entry(field).field_type().clone();
NamedField {
name: field_name.to_string(),
field,
field_type,
}
})
.collect()
}
#[tokio::test]
async fn test_packager_no_merge_required() -> anyhow::Result<()> {
quickwit_common::setup_logging_for_tests();
let universe = Universe::new();
let (mailbox, inbox) = create_test_mailbox();
let indexed_split = make_indexed_split_for_test(&[&[1628203589, 1628203640]])?;
let tag_fields = get_tag_fields(
indexed_split.index.schema(),
&["tag_str", "tag_many", "tag_u64", "tag_i64", "tag_f64"],
);
let packager = Packager::new("TestPackager", tag_fields, mailbox);
let (packager_mailbox, packager_handle) = universe.spawn_actor(packager).spawn();
packager_mailbox
.send_message(IndexedSplitBatch {
splits: vec![indexed_split],
})
.await?;
assert_eq!(
packager_handle.process_pending_and_observe().await.obs_type,
ObservationType::Alive
);
let packaged_splits = inbox.drain_for_test();
assert_eq!(packaged_splits.len(), 1);
let packaged_split = packaged_splits[0]
.downcast_ref::<PackagedSplitBatch>()
.unwrap();
let split = &packaged_split.splits[0];
assert_eq!(
&split.tags.iter().map(|s| s.as_str()).collect::<Vec<&str>>(),
&[
"tag_f64!",
"tag_f64:-42.02",
"tag_i64!",
"tag_i64:-42",
"tag_str!",
"tag_str:value",
"tag_u64!",
"tag_u64:42"
]
);
Ok(())
}
#[tokio::test]
async fn test_packager_merge_required() -> anyhow::Result<()> {
quickwit_common::setup_logging_for_tests();
let universe = Universe::new();
let (mailbox, inbox) = create_test_mailbox();
let indexed_split = make_indexed_split_for_test(&[&[1628203589], &[1628203640]])?;
let tag_fields = get_tag_fields(indexed_split.index.schema(), &[]);
let packager = Packager::new("TestPackager", tag_fields, mailbox);
let (packager_mailbox, packager_handle) = universe.spawn_actor(packager).spawn();
packager_mailbox
.send_message(IndexedSplitBatch {
splits: vec![indexed_split],
})
.await?;
assert_eq!(
packager_handle.process_pending_and_observe().await.obs_type,
ObservationType::Alive
);
let packaged_splits = inbox.drain_for_test();
assert_eq!(packaged_splits.len(), 1);
Ok(())
}
#[tokio::test]
async fn test_package_two_indexed_split_and_merge_required() -> anyhow::Result<()> {
quickwit_common::setup_logging_for_tests();
let universe = Universe::new();
let (mailbox, inbox) = create_test_mailbox();
let indexed_split_1 = make_indexed_split_for_test(&[&[1628203589], &[1628203640]])?;
let indexed_split_2 = make_indexed_split_for_test(&[&[1628204589], &[1629203640]])?;
let tag_fields = get_tag_fields(indexed_split_1.index.schema(), &[]);
let packager = Packager::new("TestPackager", tag_fields, mailbox);
let (packager_mailbox, packager_handle) = universe.spawn_actor(packager).spawn();
packager_mailbox
.send_message(IndexedSplitBatch {
splits: vec![indexed_split_1, indexed_split_2],
})
.await?;
assert_eq!(
packager_handle.process_pending_and_observe().await.obs_type,
ObservationType::Alive
);
let packaged_splits = inbox.drain_for_test();
assert_eq!(packaged_splits.len(), 1);
assert_eq!(
packaged_splits[0]
.downcast_ref::<PackagedSplitBatch>()
.unwrap()
.splits
.len(),
2
);
Ok(())
}
}