use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Context;
use async_trait::async_trait;
use itertools::Itertools;
use quickwit_actors::{
create_mailbox, Actor, ActorContext, ActorExitStatus, ActorHandle, Handler, Health, KillSwitch,
QueueCapacity, Supervisable,
};
use quickwit_config::{build_doc_mapper, IndexingSettings, SourceConfig};
use quickwit_doc_mapper::DocMapper;
use quickwit_metastore::{IndexMetadata, Metastore, MetastoreError, SplitState};
use quickwit_storage::Storage;
use tokio::join;
use tracing::{debug, error, info, info_span, instrument, Span};
use crate::actors::merge_split_downloader::MergeSplitDownloader;
use crate::actors::publisher::PublisherType;
use crate::actors::{
GarbageCollector, Indexer, MergeExecutor, MergePlanner, NamedField, Packager, Publisher,
Uploader,
};
use crate::models::{IndexingDirectory, IndexingStatistics, Observe};
use crate::source::{quickwit_supported_sources, SourceActor};
use crate::split_store::{IndexingSplitStore, IndexingSplitStoreParams};
use crate::{MergePolicy, StableMultitenantWithTimestampMergePolicy};
const MAX_RETRY_DELAY: Duration = Duration::from_secs(600);
pub struct IndexingPipelineHandler {
pub source: ActorHandle<SourceActor>,
pub indexer: ActorHandle<Indexer>,
pub packager: ActorHandle<Packager>,
pub uploader: ActorHandle<Uploader>,
pub publisher: ActorHandle<Publisher>,
pub garbage_collector: ActorHandle<GarbageCollector>,
pub merge_planner: ActorHandle<MergePlanner>,
pub merge_split_downloader: ActorHandle<MergeSplitDownloader>,
pub merge_executor: ActorHandle<MergeExecutor>,
pub merge_packager: ActorHandle<Packager>,
pub merge_uploader: ActorHandle<Uploader>,
pub merge_publisher: ActorHandle<Publisher>,
}
#[derive(Debug, Clone, Copy)]
pub struct Supervise;
#[derive(Debug, Clone, Copy, Default)]
pub struct Spawn {
retry_count: usize,
}
pub struct IndexingPipeline {
params: IndexingPipelineParams,
previous_generations_statistics: IndexingStatistics,
statistics: IndexingStatistics,
handlers: Option<IndexingPipelineHandler>,
kill_switch: KillSwitch,
}
#[async_trait]
impl Actor for IndexingPipeline {
type ObservableState = IndexingStatistics;
fn observable_state(&self) -> Self::ObservableState {
self.statistics.clone()
}
fn name(&self) -> String {
"IndexingPipeline".to_string()
}
fn span(&self, _ctx: &ActorContext<Self>) -> Span {
info_span!("")
}
async fn initialize(&mut self, ctx: &ActorContext<Self>) -> Result<(), ActorExitStatus> {
self.handle(Spawn::default(), ctx).await?;
self.handle(Observe, ctx).await?;
self.handle(Supervise, ctx).await?;
Ok(())
}
}
impl IndexingPipeline {
pub fn new(params: IndexingPipelineParams) -> Self {
Self {
params,
previous_generations_statistics: Default::default(),
handlers: None,
kill_switch: KillSwitch::default(),
statistics: IndexingStatistics::default(),
}
}
fn supervisables(&self) -> Vec<&dyn Supervisable> {
if let Some(handlers) = self.handlers.as_ref() {
let supervisables: Vec<&dyn Supervisable> = vec![
&handlers.source,
&handlers.indexer,
&handlers.packager,
&handlers.uploader,
&handlers.publisher,
&handlers.garbage_collector,
&handlers.merge_planner,
&handlers.merge_split_downloader,
&handlers.merge_executor,
&handlers.merge_packager,
&handlers.merge_uploader,
&handlers.merge_publisher,
];
supervisables
} else {
Vec::new()
}
}
fn healthcheck(&self) -> Health {
let mut healthy_actors: Vec<&str> = Default::default();
let mut failure_or_unhealthy_actors: Vec<&str> = Default::default();
let mut success_actors: Vec<&str> = Default::default();
for supervisable in self.supervisables() {
match supervisable.health() {
Health::Healthy => {
healthy_actors.push(supervisable.name());
}
Health::FailureOrUnhealthy => {
failure_or_unhealthy_actors.push(supervisable.name());
}
Health::Success => {
success_actors.push(supervisable.name());
}
}
}
if !failure_or_unhealthy_actors.is_empty() {
error!(index=%self.params.index_id, gen=self.generation(), healthy=?healthy_actors, failure_or_unhealthy_actors=?failure_or_unhealthy_actors, success=?success_actors, "indexing pipeline error.");
return Health::FailureOrUnhealthy;
}
if healthy_actors.is_empty() {
info!(index=%self.params.index_id, gen=self.generation(), "indexing-pipeline-success");
return Health::Success;
}
debug!(index=%self.params.index_id, gen=self.generation(), healthy=?healthy_actors, failure_or_unhealthy_actors=?failure_or_unhealthy_actors, success=?success_actors, "pipeline is judged healthy.");
Health::Healthy
}
fn generation(&self) -> usize {
self.statistics.generation
}
#[instrument(name="", level="info", skip_all, fields(index=%self.params.index_id, gen=self.generation()))]
async fn spawn_pipeline(&mut self, ctx: &ActorContext<Self>) -> anyhow::Result<()> {
self.statistics.num_spawn_attempts += 1;
self.kill_switch = KillSwitch::default();
let stable_multitenant_merge_policy = StableMultitenantWithTimestampMergePolicy {
demux_enabled: self.params.indexing_settings.demux_enabled,
demux_factor: self.params.indexing_settings.merge_policy.demux_factor,
demux_field_name: self.params.indexing_settings.demux_field.clone(),
merge_enabled: self.params.indexing_settings.merge_enabled,
merge_factor: self.params.indexing_settings.merge_policy.merge_factor,
max_merge_factor: self.params.indexing_settings.merge_policy.max_merge_factor,
split_num_docs_target: self.params.indexing_settings.split_num_docs_target,
..Default::default()
};
let merge_policy: Arc<dyn MergePolicy> = Arc::new(stable_multitenant_merge_policy);
info!(
root_dir = %self.params.indexing_directory.path().display(),
merge_policy = ?merge_policy,
"spawn-indexing-pipeline",
);
let split_store = IndexingSplitStore::create_with_local_store(
self.params.storage.clone(),
self.params.indexing_directory.cache_directory.as_path(),
IndexingSplitStoreParams {
max_num_bytes: self.params.split_store_max_num_bytes,
max_num_splits: self.params.split_store_max_num_splits,
},
merge_policy.clone(),
)?;
let published_splits = self
.params
.metastore
.list_splits(&self.params.index_id, SplitState::Published, None, None)
.await?
.into_iter()
.map(|split| split.split_metadata)
.collect::<Vec<_>>();
split_store
.remove_dangling_splits(&published_splits)
.await?;
let (merge_planner_mailbox, merge_planner_inbox) =
create_mailbox::<MergePlanner>("MergePlanner".to_string(), QueueCapacity::Unbounded);
let garbage_collector = GarbageCollector::new(
self.params.index_id.clone(),
split_store.clone(),
self.params.metastore.clone(),
);
let (garbage_collector_mailbox, garbage_collector_handler) = ctx
.spawn_actor(garbage_collector)
.set_kill_switch(self.kill_switch.clone())
.spawn();
let merge_publisher = Publisher::new(
PublisherType::MergePublisher,
self.params.source.source_id.clone(),
self.params.metastore.clone(),
merge_planner_mailbox.clone(),
garbage_collector_mailbox.clone(),
None,
);
let (merge_publisher_mailbox, merge_publisher_handler) = ctx
.spawn_actor(merge_publisher)
.set_kill_switch(self.kill_switch.clone())
.spawn();
let merge_uploader = Uploader::new(
"MergeUploader",
self.params.metastore.clone(),
split_store.clone(),
merge_publisher_mailbox,
);
let (merge_uploader_mailbox, merge_uploader_handler) = ctx
.spawn_actor(merge_uploader)
.set_kill_switch(self.kill_switch.clone())
.spawn();
let index_schema = self.params.doc_mapper.schema();
let tag_fields = self
.params
.doc_mapper
.tag_field_names()
.iter()
.map(|field_name| {
index_schema
.get_field(field_name)
.context(format!("Field `{}` must exist in the schema.", field_name))
.map(|field| NamedField {
name: field_name.clone(),
field,
field_type: index_schema.get_field_entry(field).field_type().clone(),
})
})
.collect::<Result<Vec<_>, _>>()?;
let merge_packager =
Packager::new("MergePackager", tag_fields.clone(), merge_uploader_mailbox);
let (merge_packager_mailbox, merge_packager_handler) = ctx
.spawn_actor(merge_packager)
.set_kill_switch(self.kill_switch.clone())
.spawn();
let merge_executor = MergeExecutor::new(
self.params.index_id.clone(),
merge_packager_mailbox,
self.params.indexing_settings.timestamp_field.clone(),
self.params.indexing_settings.demux_field.clone(),
self.params.indexing_settings.split_num_docs_target as usize,
self.params.indexing_settings.split_num_docs_target as usize * 2,
);
let (merge_executor_mailbox, merge_executor_handler) = ctx
.spawn_actor(merge_executor)
.set_kill_switch(self.kill_switch.clone())
.spawn();
let merge_split_downloader = MergeSplitDownloader {
scratch_directory: self.params.indexing_directory.scratch_directory.clone(),
storage: split_store.clone(),
merge_executor_mailbox,
};
let (merge_split_downloader_mailbox, merge_split_downloader_handler) = ctx
.spawn_actor(merge_split_downloader)
.set_kill_switch(self.kill_switch.clone())
.spawn();
let published_split_metadatas = published_splits.into_iter().collect_vec();
let merge_planner = MergePlanner::new(
published_split_metadatas,
merge_policy.clone(),
merge_split_downloader_mailbox,
);
let (merge_planner_mailbox, merge_planner_handler) = ctx
.spawn_actor(merge_planner)
.set_kill_switch(self.kill_switch.clone())
.set_mailboxes(merge_planner_mailbox, merge_planner_inbox)
.spawn();
let (source_mailbox, source_inbox) =
create_mailbox::<SourceActor>("SourceActor".to_string(), QueueCapacity::Unbounded);
let publisher = Publisher::new(
PublisherType::MainPublisher,
self.params.source.source_id.clone(),
self.params.metastore.clone(),
merge_planner_mailbox,
garbage_collector_mailbox,
Some(source_mailbox.clone()),
);
let (publisher_mailbox, publisher_handler) = ctx
.spawn_actor(publisher)
.set_kill_switch(self.kill_switch.clone())
.spawn();
let uploader = Uploader::new(
"Uploader",
self.params.metastore.clone(),
split_store.clone(),
publisher_mailbox,
);
let (uploader_mailbox, uploader_handler) = ctx
.spawn_actor(uploader)
.set_kill_switch(self.kill_switch.clone())
.spawn();
let packager = Packager::new("Packager", tag_fields, uploader_mailbox);
let (packager_mailbox, packager_handler) = ctx
.spawn_actor(packager)
.set_kill_switch(self.kill_switch.clone())
.spawn();
let indexer = Indexer::new(
self.params.index_id.clone(),
self.params.doc_mapper.clone(),
self.params.source.source_id.clone(),
self.params.metastore.clone(),
self.params.indexing_directory.clone(),
self.params.indexing_settings.clone(),
packager_mailbox,
);
let (indexer_mailbox, indexer_handler) = ctx
.spawn_actor(indexer)
.set_kill_switch(self.kill_switch.clone())
.spawn();
let index_metadata = self
.params
.metastore
.index_metadata(&self.params.index_id)
.await?;
let source_checkpoint = index_metadata
.checkpoint
.source_checkpoint(&self.params.source.source_id)
.cloned()
.unwrap_or_default(); let source = quickwit_supported_sources()
.load_source(self.params.source.clone(), source_checkpoint)
.await?;
let actor_source = SourceActor {
source,
batch_sink: indexer_mailbox,
};
let (_source_mailbox, source_handler) = ctx
.spawn_actor(actor_source)
.set_mailboxes(source_mailbox, source_inbox)
.set_kill_switch(self.kill_switch.clone())
.spawn();
self.previous_generations_statistics = self.statistics.clone();
self.statistics.generation += 1;
self.handlers = Some(IndexingPipelineHandler {
source: source_handler,
indexer: indexer_handler,
packager: packager_handler,
uploader: uploader_handler,
publisher: publisher_handler,
garbage_collector: garbage_collector_handler,
merge_planner: merge_planner_handler,
merge_split_downloader: merge_split_downloader_handler,
merge_executor: merge_executor_handler,
merge_packager: merge_packager_handler,
merge_uploader: merge_uploader_handler,
merge_publisher: merge_publisher_handler,
});
Ok(())
}
fn wait_duration_before_retry(retry_count: usize) -> Duration {
let max_power = (retry_count as u32 + 1).min(31);
Duration::from_secs(2u64.pow(max_power) as u64).min(MAX_RETRY_DELAY)
}
async fn terminate(&mut self) {
self.kill_switch.kill();
if let Some(handlers) = self.handlers.take() {
tokio::join!(
handlers.source.kill(),
handlers.indexer.kill(),
handlers.packager.kill(),
handlers.uploader.kill(),
handlers.publisher.kill(),
handlers.garbage_collector.kill(),
handlers.merge_planner.kill(),
handlers.merge_split_downloader.kill(),
handlers.merge_executor.kill(),
handlers.merge_packager.kill(),
handlers.merge_uploader.kill(),
handlers.merge_publisher.kill(),
);
}
}
}
#[async_trait]
impl Handler<Observe> for IndexingPipeline {
type Reply = ();
async fn handle(
&mut self,
_: Observe,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
if let Some(handlers) = self.handlers.as_ref() {
let (indexer_counters, uploader_counters, publisher_counters) = join!(
handlers.indexer.observe(),
handlers.uploader.observe(),
handlers.publisher.observe(),
);
self.statistics = self
.previous_generations_statistics
.clone()
.add_actor_counters(
&*indexer_counters,
&*uploader_counters,
&*publisher_counters,
)
.set_generation(self.statistics.generation)
.set_num_spawn_attempts(self.statistics.num_spawn_attempts);
}
ctx.schedule_self_msg(Duration::from_secs(1), Observe).await;
Ok(())
}
}
#[async_trait]
impl Handler<Supervise> for IndexingPipeline {
type Reply = ();
async fn handle(
&mut self,
_: Supervise,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
if self.handlers.is_some() {
match self.healthcheck() {
Health::Healthy => {}
Health::FailureOrUnhealthy => {
self.terminate().await;
ctx.schedule_self_msg(quickwit_actors::HEARTBEAT, Spawn { retry_count: 0 })
.await;
}
Health::Success => {
return Err(ActorExitStatus::Success);
}
}
}
ctx.schedule_self_msg(quickwit_actors::HEARTBEAT, Supervise)
.await;
Ok(())
}
}
#[async_trait]
impl Handler<Spawn> for IndexingPipeline {
type Reply = ();
async fn handle(
&mut self,
spawn: Spawn,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
if self.handlers.is_some() {
return Ok(());
}
self.previous_generations_statistics.num_spawn_attempts = 1 + spawn.retry_count;
if let Err(spawn_error) = self.spawn_pipeline(ctx).await {
if let Some(MetastoreError::IndexDoesNotExist { .. }) =
spawn_error.downcast_ref::<MetastoreError>()
{
info!(error = ?spawn_error, "Could not spawn pipeline, index might have been deleted.");
return Err(ActorExitStatus::Success);
}
let retry_delay = Self::wait_duration_before_retry(spawn.retry_count);
error!(error = ?spawn_error, retry_count = spawn.retry_count, retry_delay = ?retry_delay, "Error while spawning indexing pipeline, retrying after some time.");
ctx.schedule_self_msg(
retry_delay,
Spawn {
retry_count: spawn.retry_count + 1,
},
)
.await;
}
Ok(())
}
}
pub struct IndexingPipelineParams {
pub index_id: String,
pub doc_mapper: Arc<dyn DocMapper>,
pub indexing_directory: IndexingDirectory,
pub indexing_settings: IndexingSettings,
pub source: SourceConfig,
pub split_store_max_num_bytes: usize,
pub split_store_max_num_splits: usize,
pub metastore: Arc<dyn Metastore>,
pub storage: Arc<dyn Storage>,
}
impl IndexingPipelineParams {
pub async fn try_new(
index_metadata: IndexMetadata,
source: SourceConfig,
indexing_dir_path: PathBuf,
split_store_max_num_bytes: usize,
split_store_max_num_splits: usize,
metastore: Arc<dyn Metastore>,
storage: Arc<dyn Storage>,
) -> anyhow::Result<Self> {
let doc_mapper = build_doc_mapper(
&index_metadata.doc_mapping,
&index_metadata.search_settings,
&index_metadata.indexing_settings,
)?;
let indexing_directory_path = indexing_dir_path
.join(&index_metadata.index_id)
.join(&source.source_id);
let indexing_directory = IndexingDirectory::create_in_dir(indexing_directory_path).await?;
Ok(Self {
index_id: index_metadata.index_id,
doc_mapper,
indexing_directory,
indexing_settings: index_metadata.indexing_settings,
source,
split_store_max_num_bytes,
split_store_max_num_splits,
metastore,
storage,
})
}
}
#[cfg(test)]
mod tests {
use std::path::PathBuf;
use std::sync::Arc;
use quickwit_actors::Universe;
use quickwit_config::{IndexingSettings, SourceParams};
use quickwit_doc_mapper::default_doc_mapper_for_tests;
use quickwit_metastore::{IndexMetadata, MetastoreError, MockMetastore};
use quickwit_storage::RamStorage;
use super::{IndexingPipeline, *};
use crate::models::IndexingDirectory;
#[test]
fn test_wait_duration() {
assert_eq!(
IndexingPipeline::wait_duration_before_retry(0),
Duration::from_secs(2)
);
assert_eq!(
IndexingPipeline::wait_duration_before_retry(1),
Duration::from_secs(4)
);
assert_eq!(
IndexingPipeline::wait_duration_before_retry(2),
Duration::from_secs(8)
);
assert_eq!(
IndexingPipeline::wait_duration_before_retry(3),
Duration::from_secs(16)
);
assert_eq!(
IndexingPipeline::wait_duration_before_retry(8),
Duration::from_secs(512)
);
assert_eq!(
IndexingPipeline::wait_duration_before_retry(9),
MAX_RETRY_DELAY
);
}
async fn test_indexing_pipeline_num_fails_before_success(
mut num_fails: usize,
) -> anyhow::Result<bool> {
quickwit_common::setup_logging_for_tests();
let index_id = "test-index";
let mut metastore = MockMetastore::default();
metastore
.expect_index_metadata()
.withf(|index_id| index_id == "test-index")
.returning(move |_| {
if num_fails == 0 {
let index_metadata =
IndexMetadata::for_test("test-index", "ram://indexes/my-index");
return Ok(index_metadata);
}
num_fails -= 1;
Err(MetastoreError::ConnectionError {
message: "MetastoreError Alarm".to_string(),
})
});
metastore
.expect_list_splits()
.returning(|_, _, _, _| Ok(Vec::new()));
metastore
.expect_mark_splits_for_deletion()
.returning(|_, _| Ok(()));
metastore
.expect_stage_split()
.withf(move |requested_index_id, _metadata| -> bool { requested_index_id == index_id })
.times(1)
.returning(|_, _| Ok(()));
metastore
.expect_publish_splits()
.withf(
move |requested_index_id, source_id, splits, checkpoint_delta| -> bool {
requested_index_id == index_id
&& source_id == "test-source"
&& splits.len() == 1
&& format!("{:?}", checkpoint_delta)
.ends_with(":(00000000000000000000..00000000000000001030])")
},
)
.times(1)
.returning(|_, _, _, _| Ok(()));
let universe = Universe::new();
let source_config = SourceConfig {
source_id: "test-source".to_string(),
source_params: SourceParams::file(PathBuf::from("data/test_corpus.json")),
};
let indexing_pipeline_params = IndexingPipelineParams {
index_id: index_id.to_string(),
doc_mapper: Arc::new(default_doc_mapper_for_tests()),
indexing_directory: IndexingDirectory::for_test().await?,
indexing_settings: IndexingSettings::for_test(),
split_store_max_num_bytes: 10_000_000,
split_store_max_num_splits: 100,
source: source_config,
metastore: Arc::new(metastore),
storage: Arc::new(RamStorage::default()),
};
let pipeline = IndexingPipeline::new(indexing_pipeline_params);
let (_pipeline_mailbox, pipeline_handler) = universe.spawn_actor(pipeline).spawn();
let (pipeline_exit_status, pipeline_statistics) = pipeline_handler.join().await;
assert_eq!(pipeline_statistics.generation, 1);
assert_eq!(pipeline_statistics.num_spawn_attempts, 1 + num_fails);
Ok(pipeline_exit_status.is_success())
}
#[tokio::test]
async fn test_indexing_pipeline_retry_1() -> anyhow::Result<()> {
test_indexing_pipeline_num_fails_before_success(0).await?;
Ok(())
}
#[tokio::test]
async fn test_indexing_pipeline_retry_2() -> anyhow::Result<()> {
test_indexing_pipeline_num_fails_before_success(2).await?;
Ok(())
}
#[tokio::test]
async fn test_indexing_pipeline_retry_3() -> anyhow::Result<()> {
test_indexing_pipeline_num_fails_before_success(3).await?;
Ok(())
}
#[tokio::test]
async fn test_indexing_pipeline() -> anyhow::Result<()> {
quickwit_common::setup_logging_for_tests();
let mut metastore = MockMetastore::default();
metastore
.expect_index_metadata()
.withf(|index_id| index_id == "test-index")
.returning(move |_| {
Ok(IndexMetadata::for_test(
"test-index",
"ram://indexes/my-index",
))
});
metastore
.expect_list_splits()
.times(3)
.returning(|_, _, _, _| Ok(Vec::new()));
metastore
.expect_mark_splits_for_deletion()
.times(1)
.returning(|_, _| Ok(()));
metastore
.expect_stage_split()
.withf(move |index_id, _metadata| -> bool { index_id == "test-index" })
.times(1)
.returning(|_, _| Ok(()));
metastore
.expect_publish_splits()
.withf(
move |index_id, source_id, splits, checkpoint_delta| -> bool {
index_id == "test-index"
&& source_id == "test-source"
&& splits.len() == 1
&& format!("{:?}", checkpoint_delta)
.ends_with(":(00000000000000000000..00000000000000001030])")
},
)
.times(1)
.returning(|_, _, _, _| Ok(()));
let universe = Universe::new();
let source = SourceConfig {
source_id: "test-source".to_string(),
source_params: SourceParams::file(PathBuf::from("data/test_corpus.json")),
};
let pipeline_params = IndexingPipelineParams {
index_id: "test-index".to_string(),
doc_mapper: Arc::new(default_doc_mapper_for_tests()),
indexing_directory: IndexingDirectory::for_test().await?,
indexing_settings: IndexingSettings::for_test(),
split_store_max_num_bytes: 10_000_000,
split_store_max_num_splits: 100,
source,
metastore: Arc::new(metastore),
storage: Arc::new(RamStorage::default()),
};
let pipeline = IndexingPipeline::new(pipeline_params);
let (_pipeline_mailbox, pipeline_handler) = universe.spawn_actor(pipeline).spawn();
let (pipeline_exit_status, pipeline_statistics) = pipeline_handler.join().await;
assert!(pipeline_exit_status.is_success());
assert_eq!(pipeline_statistics.generation, 1);
assert_eq!(pipeline_statistics.num_spawn_attempts, 1);
assert_eq!(pipeline_statistics.num_published_splits, 1);
Ok(())
}
}