use std::ops::RangeInclusive;
use std::sync::Arc;
use anyhow::Context;
use async_trait::async_trait;
use fail::fail_point;
use quickwit_actors::{
Actor, ActorContext, ActorExitStatus, ActorRunner, Handler, Mailbox, QueueCapacity,
};
use quickwit_config::IndexingSettings;
use quickwit_doc_mapper::{DocMapper, DocParsingError, SortBy, QUICKWIT_TOKENIZER_MANAGER};
use quickwit_metastore::Metastore;
use tantivy::schema::{Field, Value};
use tantivy::{Document, IndexBuilder, IndexSettings, IndexSortByField};
use tracing::{info, warn};
use crate::actors::Packager;
use crate::models::{IndexedSplit, IndexedSplitBatch, IndexingDirectory, RawDocBatch};
#[derive(Debug)]
struct CommitTimeout {
split_id: String,
}
#[derive(Clone, Default, Debug, Eq, PartialEq)]
pub struct IndexerCounters {
pub num_parse_errors: u64,
pub num_missing_fields: u64,
pub num_valid_docs: u64,
pub num_splits_emitted: u64,
pub overall_num_bytes: u64,
pub num_docs_in_split: u64,
}
impl IndexerCounters {
pub fn num_processed_docs(&self) -> u64 {
self.num_valid_docs + self.num_parse_errors + self.num_missing_fields
}
pub fn num_invalid_docs(&self) -> u64 {
self.num_parse_errors + self.num_missing_fields
}
}
struct IndexerState {
index_id: String,
doc_mapper: Arc<dyn DocMapper>,
indexing_directory: IndexingDirectory,
indexing_settings: IndexingSettings,
timestamp_field_opt: Option<Field>,
sort_by_field_opt: Option<IndexSortByField>,
}
enum PrepareDocumentOutcome {
ParsingError,
MissingField,
Document {
document: Document,
timestamp_opt: Option<i64>,
},
}
impl IndexerState {
fn create_indexed_split(&self, ctx: &ActorContext<Indexer>) -> anyhow::Result<IndexedSplit> {
let schema = self.doc_mapper.schema();
let index_settings = IndexSettings {
sort_by_field: self.sort_by_field_opt.clone(),
..Default::default()
};
let index_builder = IndexBuilder::new()
.settings(index_settings)
.schema(schema)
.tokenizers(QUICKWIT_TOKENIZER_MANAGER.clone());
let indexed_split = IndexedSplit::new_in_dir(
self.index_id.clone(),
self.indexing_directory.scratch_directory.clone(),
self.indexing_settings.resources.clone(),
index_builder,
ctx.progress().clone(),
ctx.kill_switch().clone(),
)?;
info!(split_id = %indexed_split.split_id, "new-split");
Ok(indexed_split)
}
async fn get_or_create_current_indexed_split<'a>(
&self,
current_split_opt: &'a mut Option<IndexedSplit>,
ctx: &ActorContext<Indexer>,
) -> anyhow::Result<&'a mut IndexedSplit> {
if current_split_opt.is_none() {
let new_indexed_split = self.create_indexed_split(ctx)?;
let commit_timeout_message = CommitTimeout {
split_id: new_indexed_split.split_id.clone(),
};
ctx.schedule_self_msg(
self.indexing_settings.commit_timeout(),
commit_timeout_message,
)
.await;
*current_split_opt = Some(new_indexed_split);
}
let current_index_split = current_split_opt.as_mut().with_context(|| {
"No index writer available. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues."
})?;
Ok(current_index_split)
}
fn prepare_document(&self, doc_json: String) -> PrepareDocumentOutcome {
let doc_parsing_result = self.doc_mapper.doc_from_json(doc_json);
let document = match doc_parsing_result {
Ok(doc) => doc,
Err(doc_parsing_error) => {
warn!(err=?doc_parsing_error);
return match doc_parsing_error {
DocParsingError::RequiredFastField(_) => PrepareDocumentOutcome::MissingField,
_ => PrepareDocumentOutcome::ParsingError,
};
}
};
let timestamp_field = if let Some(timestamp_field) = self.timestamp_field_opt {
timestamp_field
} else {
return PrepareDocumentOutcome::Document {
document,
timestamp_opt: None,
};
};
let timestamp_opt = document.get_first(timestamp_field).and_then(Value::as_i64);
assert!(
timestamp_opt.is_some(),
"We should always have a timestamp here as doc parsing returns a `RequiredFastField` \
error on a missing timestamp."
);
PrepareDocumentOutcome::Document {
document,
timestamp_opt,
}
}
async fn process_batch(
&self,
batch: RawDocBatch,
current_split_opt: &mut Option<IndexedSplit>,
counters: &mut IndexerCounters,
ctx: &ActorContext<Indexer>,
) -> Result<(), ActorExitStatus> {
let indexed_split = self
.get_or_create_current_indexed_split(current_split_opt, ctx)
.await?;
indexed_split
.checkpoint_delta
.extend(batch.checkpoint_delta)
.with_context(|| "Batch delta does not follow indexer checkpoint")?;
for doc_json in batch.docs {
counters.overall_num_bytes += doc_json.len() as u64;
indexed_split.docs_size_in_bytes += doc_json.len() as u64;
let prepared_doc = {
let _protect_zone = ctx.protect_zone();
self.prepare_document(doc_json)
};
match prepared_doc {
PrepareDocumentOutcome::ParsingError => {
counters.num_parse_errors += 1;
}
PrepareDocumentOutcome::MissingField => {
counters.num_missing_fields += 1;
}
PrepareDocumentOutcome::Document {
document,
timestamp_opt,
} => {
counters.num_docs_in_split += 1;
counters.num_valid_docs += 1;
indexed_split.num_docs += 1;
if let Some(timestamp) = timestamp_opt {
record_timestamp(timestamp, &mut indexed_split.time_range);
}
let _protect_guard = ctx.protect_zone();
indexed_split
.index_writer
.add_document(document)
.with_context(|| "Failed to add document.")?;
}
}
ctx.record_progress();
}
Ok(())
}
}
pub struct Indexer {
indexer_state: IndexerState,
packager_mailbox: Mailbox<Packager>,
current_split_opt: Option<IndexedSplit>,
source_id: String,
metastore: Arc<dyn Metastore>,
counters: IndexerCounters,
}
#[async_trait]
impl Actor for Indexer {
type ObservableState = IndexerCounters;
fn observable_state(&self) -> Self::ObservableState {
self.counters.clone()
}
fn queue_capacity(&self) -> QueueCapacity {
QueueCapacity::Bounded(10)
}
fn name(&self) -> String {
"Indexer".to_string()
}
fn runner(&self) -> ActorRunner {
ActorRunner::DedicatedThread
}
async fn finalize(
&mut self,
exit_status: &ActorExitStatus,
ctx: &ActorContext<Self>,
) -> anyhow::Result<()> {
match exit_status {
ActorExitStatus::DownstreamClosed
| ActorExitStatus::Killed
| ActorExitStatus::Failure(_)
| ActorExitStatus::Panicked => return Ok(()),
ActorExitStatus::Quit | ActorExitStatus::Success => {
self.send_to_packager(CommitTrigger::NoMoreDocs, ctx)
.await?;
}
}
Ok(())
}
}
fn record_timestamp(timestamp: i64, time_range: &mut Option<RangeInclusive<i64>>) {
let new_timestamp_range = match time_range.as_ref() {
Some(range) => {
RangeInclusive::new(timestamp.min(*range.start()), timestamp.max(*range.end()))
}
None => RangeInclusive::new(timestamp, timestamp),
};
*time_range = Some(new_timestamp_range);
}
#[async_trait]
impl Handler<CommitTimeout> for Indexer {
type Reply = ();
async fn handle(
&mut self,
commit_timeout: CommitTimeout,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
self.process_commit_timeout(&commit_timeout.split_id, ctx)
.await?;
Ok(())
}
}
#[async_trait]
impl Handler<RawDocBatch> for Indexer {
type Reply = ();
async fn handle(
&mut self,
batch: RawDocBatch,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
self.process_batch(batch, ctx).await
}
}
#[derive(Debug, Clone, Copy)]
enum CommitTrigger {
Timeout,
NoMoreDocs,
NumDocsLimit,
}
impl Indexer {
pub fn new(
index_id: String,
doc_mapper: Arc<dyn DocMapper>,
source_id: String,
metastore: Arc<dyn Metastore>,
indexing_directory: IndexingDirectory,
indexing_settings: IndexingSettings,
packager_mailbox: Mailbox<Packager>,
) -> Self {
let schema = doc_mapper.schema();
let timestamp_field_opt = doc_mapper.timestamp_field(&schema);
let sort_by_field_opt = match indexing_settings.sort_by() {
SortBy::DocId => None,
SortBy::FastField { field_name, order } => Some(IndexSortByField {
field: field_name,
order: order.into(),
}),
};
Self {
indexer_state: IndexerState {
index_id,
doc_mapper,
indexing_directory,
indexing_settings,
timestamp_field_opt,
sort_by_field_opt,
},
packager_mailbox,
current_split_opt: None,
source_id,
metastore,
counters: IndexerCounters::default(),
}
}
async fn process_batch(
&mut self,
batch: RawDocBatch,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
fail_point!("indexer:batch:before");
self.indexer_state
.process_batch(batch, &mut self.current_split_opt, &mut self.counters, ctx)
.await?;
if self.counters.num_docs_in_split
>= self.indexer_state.indexing_settings.split_num_docs_target as u64
{
self.send_to_packager(CommitTrigger::NumDocsLimit, ctx)
.await?;
}
fail_point!("indexer:batch:after");
Ok(())
}
async fn process_commit_timeout(
&mut self,
split_id: &str,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
if let Some(split) = self.current_split_opt.as_ref() {
if split.split_id != split_id {
return Ok(());
}
}
self.send_to_packager(CommitTrigger::Timeout, ctx).await?;
Ok(())
}
async fn send_to_packager(
&mut self,
commit_trigger: CommitTrigger,
ctx: &ActorContext<Self>,
) -> anyhow::Result<()> {
let indexed_split = if let Some(indexed_split) = self.current_split_opt.take() {
indexed_split
} else {
return Ok(());
};
if indexed_split.num_docs == 0 {
self.metastore
.publish_splits(
&indexed_split.index_id,
&self.source_id,
&[],
indexed_split.checkpoint_delta,
)
.await
.with_context(|| {
format!(
"Failed to update the checkpoint for {}, {} after a split containing only \
errors.",
&indexed_split.index_id, &self.source_id
)
})?;
return Ok(());
}
info!(commit_trigger=?commit_trigger, split=?indexed_split.split_id, num_docs=self.counters.num_docs_in_split, "send-to-packager");
ctx.send_message(
&self.packager_mailbox,
IndexedSplitBatch {
splits: vec![indexed_split],
},
)
.await?;
self.counters.num_docs_in_split = 0;
self.counters.num_splits_emitted += 1;
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Duration;
use quickwit_actors::{create_test_mailbox, Universe};
use quickwit_doc_mapper::SortOrder;
use quickwit_metastore::checkpoint::CheckpointDelta;
use quickwit_metastore::MockMetastore;
use super::*;
use crate::actors::indexer::{record_timestamp, IndexerCounters};
use crate::models::{IndexingDirectory, RawDocBatch};
#[test]
fn test_record_timestamp() {
let mut time_range = None;
record_timestamp(1628664679, &mut time_range);
assert_eq!(time_range, Some(1628664679..=1628664679));
record_timestamp(1628664112, &mut time_range);
assert_eq!(time_range, Some(1628664112..=1628664679));
record_timestamp(1628665112, &mut time_range);
assert_eq!(time_range, Some(1628664112..=1628665112))
}
#[tokio::test]
async fn test_indexer_simple() -> anyhow::Result<()> {
quickwit_common::setup_logging_for_tests();
let doc_mapper = Arc::new(quickwit_doc_mapper::default_doc_mapper_for_tests());
let indexing_directory = IndexingDirectory::for_test().await?;
let mut indexing_settings = IndexingSettings::for_test();
indexing_settings.split_num_docs_target = 3;
indexing_settings.sort_field = Some("timestamp".to_string());
indexing_settings.sort_order = Some(SortOrder::Desc);
indexing_settings.timestamp_field = Some("timestamp".to_string());
let (mailbox, inbox) = create_test_mailbox();
let mut metastore = MockMetastore::default();
metastore
.expect_publish_splits()
.returning(move |_, _, splits, _| {
assert!(splits.is_empty());
Ok(())
});
let indexer = Indexer::new(
"test-index".to_string(),
doc_mapper,
"source-id".to_string(),
Arc::new(metastore),
indexing_directory,
indexing_settings,
mailbox,
);
let universe = Universe::new();
let (indexer_mailbox, indexer_handle) = universe.spawn_actor(indexer).spawn();
indexer_mailbox
.send_message(RawDocBatch {
docs: vec![
r#"{"body": "happy", "response_date": 1652866573228, "response_time": 12, "response_payload": "YWJj"}"#.to_string(), r#"{"body": "happy", "timestamp": 1628837062, "response_date": 1652866573229, "response_time": 2, "response_payload": "YWJj"}"#.to_string(), r#"{"body": "happy2", "timestamp": 1628837062, "response_date": 1652866573232, "response_time": 13, "response_payload": "YWJj"}"#.to_string(), "{".to_string(), ],
checkpoint_delta: CheckpointDelta::from(0..4),
})
.await?;
let indexer_counters = indexer_handle.process_pending_and_observe().await.state;
assert_eq!(
indexer_counters,
IndexerCounters {
num_parse_errors: 1,
num_missing_fields: 1,
num_valid_docs: 2,
num_splits_emitted: 0,
num_docs_in_split: 2, overall_num_bytes: 345
}
);
indexer_mailbox
.send_message(
RawDocBatch {
docs: vec![r#"{"body": "happy3", "timestamp": 1628837062, "response_date": 1652866573227, "response_time": 12, "response_payload": "YWJj"}"#.to_string()],
checkpoint_delta: CheckpointDelta::from(4..5),
}
)
.await?;
let indexer_counters = indexer_handle.process_pending_and_observe().await.state;
assert_eq!(
indexer_counters,
IndexerCounters {
num_parse_errors: 1,
num_missing_fields: 1,
num_valid_docs: 3,
num_splits_emitted: 1,
num_docs_in_split: 0, overall_num_bytes: 469
}
);
let output_messages = inbox.drain_for_test();
assert_eq!(output_messages.len(), 1);
let batch = output_messages[0]
.downcast_ref::<IndexedSplitBatch>()
.unwrap();
assert_eq!(batch.splits[0].num_docs, 3);
let sort_by_field = batch.splits[0].index.settings().sort_by_field.as_ref();
assert!(sort_by_field.is_some());
assert_eq!(sort_by_field.unwrap().field, "timestamp");
assert!(sort_by_field.unwrap().order.is_desc());
Ok(())
}
#[tokio::test]
async fn test_indexer_timeout() -> anyhow::Result<()> {
quickwit_common::setup_logging_for_tests();
let doc_mapper = Arc::new(quickwit_doc_mapper::default_doc_mapper_for_tests());
let indexing_directory = IndexingDirectory::for_test().await?;
let indexing_settings = IndexingSettings::for_test();
let (mailbox, inbox) = create_test_mailbox();
let mut metastore = MockMetastore::default();
metastore
.expect_publish_splits()
.returning(move |_, _, splits, _| {
assert!(splits.is_empty());
Ok(())
});
let indexer = Indexer::new(
"test-index".to_string(),
doc_mapper,
"source-id".to_string(),
Arc::new(metastore),
indexing_directory,
indexing_settings,
mailbox,
);
let universe = Universe::new();
let (indexer_mailbox, indexer_handle) = universe.spawn_actor(indexer).spawn();
indexer_mailbox
.send_message(
RawDocBatch {
docs: vec![r#"{"body": "happy", "timestamp": 1628837062, "response_date": 1652866573228, "response_time": 12, "response_payload": "YWJj"}"#.to_string()],
checkpoint_delta: CheckpointDelta::from(0..1),
}
)
.await?;
let indexer_counters = indexer_handle.process_pending_and_observe().await.state;
assert_eq!(
indexer_counters,
IndexerCounters {
num_parse_errors: 0,
num_missing_fields: 0,
num_valid_docs: 1,
num_splits_emitted: 0,
num_docs_in_split: 1,
overall_num_bytes: 123
}
);
universe.simulate_time_shift(Duration::from_secs(61)).await;
let indexer_counters = indexer_handle.process_pending_and_observe().await.state;
assert_eq!(
indexer_counters,
IndexerCounters {
num_parse_errors: 0,
num_missing_fields: 0,
num_valid_docs: 1,
num_splits_emitted: 1,
num_docs_in_split: 0,
overall_num_bytes: 123
}
);
let output_messages = inbox.drain_for_test();
assert_eq!(output_messages.len(), 1);
let indexed_split_batch = output_messages[0]
.downcast_ref::<IndexedSplitBatch>()
.unwrap();
assert_eq!(indexed_split_batch.splits[0].num_docs, 1);
Ok(())
}
#[tokio::test]
async fn test_indexer_eof() -> anyhow::Result<()> {
quickwit_common::setup_logging_for_tests();
let doc_mapper = Arc::new(quickwit_doc_mapper::default_doc_mapper_for_tests());
let indexing_directory = IndexingDirectory::for_test().await?;
let indexing_settings = IndexingSettings::for_test();
let (mailbox, inbox) = create_test_mailbox();
let mut metastore = MockMetastore::default();
metastore
.expect_publish_splits()
.returning(move |_, _, splits, _| {
assert!(splits.is_empty());
Ok(())
});
let indexer = Indexer::new(
"test-index".to_string(),
doc_mapper,
"source-id".to_string(),
Arc::new(metastore),
indexing_directory,
indexing_settings,
mailbox,
);
let universe = Universe::new();
let (indexer_mailbox, indexer_handle) = universe.spawn_actor(indexer).spawn();
indexer_mailbox
.send_message(
RawDocBatch {
docs: vec![r#"{"body": "happy", "timestamp": 1628837062, "response_date": 1652866573227, "response_time": 12, "response_payload": "YWJj"}"#.to_string()],
checkpoint_delta: CheckpointDelta::from(0..1),
}
)
.await?;
universe.send_exit_with_success(&indexer_mailbox).await?;
let (exit_status, indexer_counters) = indexer_handle.join().await;
assert!(exit_status.is_success());
assert_eq!(
indexer_counters,
IndexerCounters {
num_parse_errors: 0,
num_missing_fields: 0,
num_valid_docs: 1,
num_splits_emitted: 1,
num_docs_in_split: 0,
overall_num_bytes: 123
}
);
let output_messages = inbox.drain_for_test();
assert_eq!(output_messages.len(), 1);
assert_eq!(
output_messages[0]
.downcast_ref::<IndexedSplitBatch>()
.unwrap()
.splits[0]
.num_docs,
1
);
Ok(())
}
}