use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use quickwit_common::fs::empty_dir;
use quickwit_common::uri::Uri;
use quickwit_config::IndexConfig;
use quickwit_indexing::actors::INDEXING;
use quickwit_indexing::models::CACHE;
use quickwit_indexing::{
delete_splits_with_files, run_garbage_collect, FileEntry, IndexingSplitStore,
SplitDeletionError,
};
use quickwit_metastore::{
IndexMetadata, Metastore, MetastoreError, Split, SplitMetadata, SplitState,
};
use quickwit_storage::{StorageResolverError, StorageUriResolver};
use tantivy::time::OffsetDateTime;
use thiserror::Error;
use tracing::{error, info};
#[derive(Error, Debug)]
pub enum IndexServiceError {
#[error("Failed to resolve the storage `{0}`.")]
StorageError(#[from] StorageResolverError),
#[error("Metastore error `{0}`.")]
MetastoreError(#[from] MetastoreError),
#[error("Split deletion error `{0}`.")]
SplitDeletionError(#[from] SplitDeletionError),
#[error("Invalid index config: {0}.")]
InvalidIndexConfig(String),
}
pub struct IndexService {
metastore: Arc<dyn Metastore>,
storage_resolver: StorageUriResolver,
default_index_root_uri: Uri,
}
impl IndexService {
pub fn new(
metastore: Arc<dyn Metastore>,
storage_resolver: StorageUriResolver,
default_index_root_uri: Uri,
) -> Self {
Self {
metastore,
storage_resolver,
default_index_root_uri,
}
}
pub async fn get_index(&self, index_id: &str) -> Result<IndexMetadata, IndexServiceError> {
let index_metadata = self.metastore.index_metadata(index_id).await?;
Ok(index_metadata)
}
pub async fn get_all_splits(&self, index_id: &str) -> Result<Vec<Split>, IndexServiceError> {
let splits = self.metastore.list_all_splits(index_id).await?;
Ok(splits)
}
pub async fn get_indexes(&self) -> anyhow::Result<Vec<IndexMetadata>> {
let indexes_metadatas = self.metastore.list_indexes_metadatas().await?;
Ok(indexes_metadatas)
}
pub async fn create_index(
&self,
index_config: IndexConfig,
) -> Result<IndexMetadata, IndexServiceError> {
index_config
.validate()
.map_err(|error| IndexServiceError::InvalidIndexConfig(error.to_string()))?;
let index_id = index_config.index_id.clone();
let index_uri = if let Some(index_uri) = &index_config.index_uri {
index_uri.clone()
} else {
let index_uri = self.default_index_root_uri.join(&index_id).expect(
"Failed to create default index URI. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues.",
);
info!(
index_id = %index_id,
index_uri = %index_uri,
"Index config does not specify `index_uri`, falling back to default value.",
);
index_uri
};
let index_metadata = IndexMetadata {
index_id,
index_uri: index_uri.into_string(),
checkpoint: Default::default(),
sources: index_config.sources(),
doc_mapping: index_config.doc_mapping,
indexing_settings: index_config.indexing_settings,
search_settings: index_config.search_settings,
create_timestamp: OffsetDateTime::now_utc().unix_timestamp(),
update_timestamp: OffsetDateTime::now_utc().unix_timestamp(),
};
self.metastore.create_index(index_metadata).await?;
let index_metadata = self
.metastore
.index_metadata(&index_config.index_id)
.await?;
Ok(index_metadata)
}
pub async fn delete_index(
&self,
index_id: &str,
dry_run: bool,
) -> Result<Vec<FileEntry>, IndexServiceError> {
let index_uri = self.metastore.index_metadata(index_id).await?.index_uri;
let storage = self.storage_resolver.resolve(&index_uri)?;
if dry_run {
let all_splits = self
.metastore
.list_all_splits(index_id)
.await?
.into_iter()
.map(|metadata| metadata.split_metadata)
.collect::<Vec<_>>();
let file_entries_to_delete: Vec<FileEntry> =
all_splits.iter().map(FileEntry::from).collect();
return Ok(file_entries_to_delete);
}
let staged_splits = self
.metastore
.list_splits(index_id, SplitState::Staged, None, None)
.await?;
let published_splits = self
.metastore
.list_splits(index_id, SplitState::Published, None, None)
.await?;
let split_ids = staged_splits
.iter()
.chain(published_splits.iter())
.map(|meta| meta.split_id())
.collect::<Vec<_>>();
self.metastore
.mark_splits_for_deletion(index_id, &split_ids)
.await?;
let splits_to_delete = self
.metastore
.list_splits(index_id, SplitState::MarkedForDeletion, None, None)
.await?
.into_iter()
.map(|metadata| metadata.split_metadata)
.collect::<Vec<_>>();
let split_store = IndexingSplitStore::create_with_no_local_store(storage);
let deleted_entries = delete_splits_with_files(
index_id,
split_store,
self.metastore.clone(),
splits_to_delete,
None,
)
.await?;
self.metastore.delete_index(index_id).await?;
Ok(deleted_entries)
}
pub async fn garbage_collect_index(
&self,
index_id: &str,
grace_period: Duration,
dry_run: bool,
) -> anyhow::Result<Vec<FileEntry>> {
let index_uri = self.metastore.index_metadata(index_id).await?.index_uri;
let storage = self.storage_resolver.resolve(&index_uri)?;
let split_store = IndexingSplitStore::create_with_no_local_store(storage);
let deleted_entries = run_garbage_collect(
index_id,
split_store,
self.metastore.clone(),
grace_period,
Duration::ZERO,
dry_run,
None,
)
.await?;
Ok(deleted_entries)
}
pub async fn reset_index(&self, index_id: &str) -> anyhow::Result<()> {
let index_metadata = self.metastore.index_metadata(index_id).await?;
let storage = self.storage_resolver.resolve(&index_metadata.index_uri)?;
let splits = self.metastore.list_all_splits(index_id).await?;
let split_ids: Vec<&str> = splits.iter().map(|split| split.split_id()).collect();
self.metastore
.mark_splits_for_deletion(index_id, &split_ids)
.await?;
let split_metas: Vec<SplitMetadata> = splits
.into_iter()
.map(|split| split.split_metadata)
.collect();
let split_store = IndexingSplitStore::create_with_no_local_store(storage);
if let Err(err) = delete_splits_with_files(
index_id,
split_store,
self.metastore.clone(),
split_metas,
None,
)
.await
{
error!(metastore_uri = %self.metastore.uri(), index_id = %index_id, error = %err, "Not all split files could be deleted during garbage collection.");
}
Ok(())
}
}
pub fn get_cache_directory_path(data_dir_path: &Path, index_id: &str, source_id: &str) -> PathBuf {
data_dir_path
.join(INDEXING)
.join(index_id)
.join(source_id)
.join(CACHE)
}
pub async fn clear_cache_directory(
data_dir_path: &Path,
index_id: String,
source_id: String,
) -> anyhow::Result<()> {
let cache_directory_path = get_cache_directory_path(data_dir_path, &index_id, &source_id);
info!(path = %cache_directory_path.display(), "Clearing cache directory.");
empty_dir(&cache_directory_path).await?;
Ok(())
}