use std::{
borrow::Borrow,
collections::{HashMap, HashSet},
path::{Path, PathBuf},
rc::Rc,
sync::Arc,
time::Instant,
};
use exocore_chain::{
block::{BlockHeight, BlockOffset},
chain,
engine::Event,
operation::{Operation, OperationId},
pending, EngineHandle, EngineOperationStatus,
};
use exocore_core::{cell::FullCell, time::Clock};
use exocore_protos::{
generated::exocore_store::{
entity_mutation::Mutation, Entity, EntityMutation, EntityQuery,
EntityResult as EntityResultProto, EntityResultSource, EntityResults, Trait,
},
prost::{Message, ProstDateTimeExt},
registry::Registry,
};
use gc::GarbageCollector;
use itertools::Itertools;
use super::{
mutation_index::{IndexOperation, MutationIndex, MutationMetadata},
top_results::ReScoredTopResultsIterable,
};
use crate::{
entity::EntityId,
error::Error,
ordering::{OrderingValueExt, OrderingValueWrapper},
};
mod config;
pub use config::*;
mod aggregator;
pub(crate) use aggregator::*;
mod gc;
pub use gc::GarbageCollectorConfig;
#[cfg(test)]
pub(crate) mod test_index;
#[cfg(test)]
mod tests;
pub struct EntityIndex<CS, PS>
where
CS: chain::ChainStore,
PS: pending::PendingStore,
{
config: EntityIndexConfig,
pending_index: MutationIndex,
chain_index_dir: PathBuf,
chain_index: MutationIndex,
chain_index_last_block: Option<BlockOffset>,
full_cell: FullCell,
chain_handle: EngineHandle<CS, PS>,
gc: GarbageCollector,
}
impl<CS, PS> EntityIndex<CS, PS>
where
CS: chain::ChainStore,
PS: pending::PendingStore,
{
pub fn open_or_create(
cell: FullCell,
config: EntityIndexConfig,
chain_handle: EngineHandle<CS, PS>,
clock: Clock,
) -> Result<EntityIndex<CS, PS>, Error> {
let mut pending_index = MutationIndex::create_in_memory(
config.pending_index_config,
cell.cell().schemas().clone(),
)?;
pending_index.set_full_text_boost(config.pending_index_boost);
let mut chain_index_dir = cell
.cell()
.store_directory()
.ok_or_else(|| anyhow!("Cell doesn't have an path configured"))?;
chain_index_dir.push("chain");
if std::fs::metadata(&chain_index_dir).is_err() {
std::fs::create_dir_all(&chain_index_dir)?;
}
let chain_index =
Self::create_chain_index(config, cell.cell().schemas(), &chain_index_dir)?;
let mut index = EntityIndex {
config,
pending_index,
chain_index_dir,
chain_index,
chain_index_last_block: None,
full_cell: cell,
chain_handle,
gc: GarbageCollector::new(config.garbage_collector, clock),
};
let chain_last_block = index.chain_handle.get_chain_last_block_info()?;
let last_chain_indexed_block = index.chain_index.highest_indexed_block()?;
if last_chain_indexed_block.is_none() && chain_last_block.is_some() {
index.reindex_chain()?;
}
index.reindex_pending()?;
Ok(index)
}
pub fn handle_chain_engine_event(
&mut self,
event: Event,
) -> Result<(Vec<OperationId>, usize), Error> {
self.handle_chain_engine_events(std::iter::once(event))
}
pub fn handle_chain_engine_events<E>(
&mut self,
events: E,
) -> Result<(Vec<OperationId>, usize), Error>
where
E: Iterator<Item = Event>,
{
let mut index_operations_count = 0;
let mut affected_operations = Vec::new();
let mut batched_operations = Vec::new();
for event in events {
if let Event::NewPendingOperation(op_id) = event {
batched_operations.push(op_id);
affected_operations.push(op_id);
continue;
} else if !batched_operations.is_empty() {
let current_operations = std::mem::take(&mut batched_operations);
index_operations_count +=
self.handle_chain_pending_operations(current_operations.into_iter())?;
}
match event {
Event::Started => {
info!("Chain engine is ready, indexing pending store & chain");
self.index_chain_new_blocks(Some(&mut affected_operations))?;
self.reindex_pending()?;
}
Event::StreamDiscontinuity => {
warn!("Got a stream discontinuity. Forcing re-indexation of pending...");
self.reindex_pending()?;
}
Event::NewChainBlock(block_offset) => {
debug!(
"Got new block at offset {}, checking if we can index a new block",
block_offset
);
index_operations_count +=
self.index_chain_new_blocks(Some(&mut affected_operations))?;
}
Event::ChainDiverged(diverged_block_offset) => {
let highest_indexed_block = self.chain_index.highest_indexed_block()?;
warn!(
"Chain has diverged at offset={}. Highest indexed block at = {:?}",
diverged_block_offset, highest_indexed_block
);
if let Some(last_indexed_offset) = highest_indexed_block {
if last_indexed_offset < diverged_block_offset {
warn!(
"Divergence is after last indexed offset, we only re-index pending"
);
self.reindex_pending()?;
} else {
return Err(Error::Fatal(anyhow!(
"Chain has diverged at an offset={}, which is before last indexed block at offset {}",
diverged_block_offset, last_indexed_offset
)));
}
} else {
warn!("Diverged with an empty chain index. Re-indexing...");
self.reindex_chain()?;
}
}
Event::NewPendingOperation(_op_id) => unreachable!(),
}
}
if !batched_operations.is_empty() {
index_operations_count +=
self.handle_chain_pending_operations(batched_operations.into_iter())?;
}
Ok((affected_operations, index_operations_count))
}
pub fn search<Q: Borrow<EntityQuery>>(&self, query: Q) -> Result<EntityResults, Error> {
let begin_instant = Instant::now();
let query = query.borrow();
let query_include_deleted = query.include_deleted;
let mut current_page = query
.paging
.clone()
.unwrap_or_else(crate::query::default_paging);
crate::query::validate_paging(&mut current_page);
let mutations_query = EntityQuery {
paging: None,
..query.clone()
};
let chain_results = self.chain_index.search_iter(&mutations_query)?;
let chain_hits = chain_results.total_results;
let pending_results = self.pending_index.search_iter(&mutations_query)?;
let pending_hits = pending_results.total_results;
let after_query_instant = Instant::now();
let chain_results = chain_results.map(|res| (res, EntityResultSource::Chain));
let pending_results = pending_results.map(|res| (res, EntityResultSource::Pending));
let combined_results = chain_results
.merge_by(pending_results, |(res1, _src1), (res2, _src2)| {
res1.sort_value >= res2.sort_value
});
let hasher = result_hasher();
let mut digest = hasher.digest();
let mut entity_mutations_cache = HashMap::<EntityId, Rc<EntityAggregator>>::new();
let mut matched_entities = HashSet::new();
let mut got_results = false;
let early_exit = std::cell::Cell::new(false);
let mut entity_results = combined_results
.take_while(|(_matched_mutation, _index_source)| !early_exit.get())
.flat_map(|(matched_mutation, index_source)| {
let entity_id = matched_mutation.entity_id.clone();
if matched_entities.contains(&entity_id) {
return None;
}
let entity_mutations = if let Some(mutations) =
entity_mutations_cache.get(&entity_id)
{
mutations.clone()
} else {
let mut entity_mutations = self
.fetch_entity_mutations_metadata(&entity_id)
.map_err(|err| {
error!(
"Error fetching mutations metadata for entity_id={} from indices: {}",
entity_id, err
);
err
})
.ok()?;
if !query.projections.is_empty() {
entity_mutations.annotate_projections(query.projections.as_slice());
}
let entity_mutations = Rc::new(entity_mutations);
entity_mutations_cache
.insert(entity_id.clone(), entity_mutations.clone());
entity_mutations
};
let operation_still_present = entity_mutations
.active_operations
.contains(&matched_mutation.operation_id);
if entity_mutations.deletion_date.is_some() || !operation_still_present {
self.gc.maybe_flag_for_collection(&entity_mutations);
if !query_include_deleted {
return None;
}
}
matched_entities.insert(matched_mutation.entity_id.clone());
let ordering_value = matched_mutation.sort_value.clone();
if ordering_value.value.is_within_page_bound(¤t_page) {
got_results = true;
let creation_date = entity_mutations.creation_date.map(|t| t.to_proto_timestamp());
let modification_date = entity_mutations.modification_date.map(|t| t.to_proto_timestamp());
let deletion_date = entity_mutations.deletion_date.map(|t| t.to_proto_timestamp());
let result = EntityResult {
matched_mutation,
ordering_value: ordering_value.clone(),
proto: EntityResultProto {
entity: Some(Entity {
id: entity_id,
traits: Vec::new(),
creation_date,
modification_date,
deletion_date,
last_operation_id: entity_mutations.last_operation_id,
}),
source: index_source.into(),
ordering_value: Some(ordering_value.value),
hash: entity_mutations.hash,
},
mutations: entity_mutations,
};
Some(result)
} else {
if got_results {
early_exit.set(true);
}
None
}
})
.top_negatively_rescored_results(
current_page.count as usize,
|result: &EntityResult| {
(result.ordering_value.clone(), result.ordering_value.clone())
},
)
.fold(
Vec::new(),
|mut results, result| {
digest.update(&result.mutations.hash.to_ne_bytes());
results.push(result);
results
},
);
let after_aggregate_instant = Instant::now();
let next_page = if let Some(last_result) = entity_results.last() {
let mut new_page = current_page.clone();
let ascending = query
.ordering
.as_ref()
.map(|s| s.ascending)
.unwrap_or(false);
if !ascending {
new_page.before_ordering_value =
Some(last_result.matched_mutation.sort_value.value.clone());
} else {
new_page.after_ordering_value =
Some(last_result.matched_mutation.sort_value.value.clone());
}
Some(new_page)
} else {
None
};
let results_hash = digest.finalize();
let skipped_hash = results_hash == query.result_hash;
if !skipped_hash {
self.populate_results_traits(&mut entity_results, query.include_deleted);
}
let end_instant = Instant::now();
debug!(
"Query done chain_hits={} pending_hits={} aggr_fetch={} query={:?} aggr={:?} fetch={:?} total={:?} page={:?} next_page={:?}",
chain_hits,
pending_hits,
entity_mutations_cache.len(),
after_query_instant - begin_instant,
after_aggregate_instant - after_query_instant,
end_instant - after_aggregate_instant,
end_instant - begin_instant,
current_page,
next_page,
);
let entities = entity_results.into_iter().map(|res| res.proto).collect();
Ok(EntityResults {
entities,
skipped_hash,
next_page,
current_page: Some(current_page),
estimated_count: (chain_hits + pending_hits) as u32,
hash: results_hash,
})
}
pub fn run_garbage_collector(&self) -> Result<Vec<EntityMutation>, Error> {
let last_chain_indexed_block = self
.last_chain_indexed_block()
.map_err(|err| anyhow!("Couldn't get last chain indexed block: {}", err))?;
if last_chain_indexed_block.is_none() {
return Ok(Vec::new());
};
let deletions = self
.gc
.run(|entity_id| self.chain_index.fetch_entity_mutations(entity_id));
Ok(deletions)
}
fn create_chain_index<P: AsRef<Path>>(
config: EntityIndexConfig,
schemas: &Arc<Registry>,
chain_index_dir: P,
) -> Result<MutationIndex, Error> {
if !config.chain_index_in_memory {
MutationIndex::open_or_create_mmap(
config.chain_index_config,
schemas.clone(),
chain_index_dir.as_ref(),
)
} else {
MutationIndex::create_in_memory(config.chain_index_config, schemas.clone())
}
}
fn reindex_pending(&mut self) -> Result<(), Error> {
self.pending_index = MutationIndex::create_in_memory(
self.config.pending_index_config,
self.full_cell.cell().schemas().clone(),
)?;
self.pending_index
.set_full_text_boost(self.config.pending_index_boost);
let last_chain_indexed_offset = self
.last_chain_indexed_block()?
.map(|(offset, _height)| offset)
.unwrap_or(0);
info!(
"Clearing & re-indexing pending index. last_chain_indexed_offset={}",
last_chain_indexed_offset
);
let pending_iter = self
.chain_handle
.get_pending_operations(..)?
.into_iter()
.filter(|op| match op.status {
EngineOperationStatus::Pending => true,
EngineOperationStatus::Committed(offset, _height) => {
offset > last_chain_indexed_offset
}
});
let pending_and_chain_iter = {
let pending_iter =
pending_iter.filter(move |op| op.status == EngineOperationStatus::Pending);
let chain_iter = self
.chain_handle
.get_chain_operations(Some(last_chain_indexed_offset))
.filter(move |op| {
if let EngineOperationStatus::Committed(offset, _height) = op.status {
offset > last_chain_indexed_offset
} else {
false
}
});
Box::new(chain_iter.chain(pending_iter))
};
let mutations_iter =
pending_and_chain_iter.flat_map(IndexOperation::from_pending_engine_operation);
self.pending_index.apply_operations(mutations_iter)?;
Ok(())
}
fn reindex_chain(&mut self) -> Result<(), Error> {
info!("Clearing & reindexing chain index");
self.chain_index = MutationIndex::create_in_memory(
self.config.pending_index_config,
self.full_cell.cell().schemas().clone(),
)?;
std::fs::remove_dir_all(&self.chain_index_dir)?;
std::fs::create_dir_all(&self.chain_index_dir)?;
self.chain_index = Self::create_chain_index(
self.config,
self.full_cell.cell().schemas(),
&self.chain_index_dir,
)?;
self.index_chain_new_blocks(None)?;
self.reindex_pending()?;
Ok(())
}
fn index_chain_new_blocks(
&mut self,
affected_operations: Option<&mut Vec<OperationId>>,
) -> Result<usize, Error> {
let (_last_chain_block_offset, last_chain_block_height) = self
.chain_handle
.get_chain_last_block_info()?
.ok_or_else(|| anyhow!("Tried to index chain, but it had no blocks in it"))?;
let chain_index_min_depth = self.config.chain_index_min_depth;
let chain_index_depth_leeway = self.config.chain_index_depth_leeway;
let last_indexed_block = self.last_chain_indexed_block()?;
let offset_from = last_indexed_block.map(|(offset, _height)| offset);
if let Some((_last_indexed_offset, last_indexed_height)) = last_indexed_block {
let depth = last_chain_block_height - last_indexed_height;
if depth < chain_index_min_depth || depth < chain_index_depth_leeway {
debug!(
"No need to index new blocks to chain index. last_chain_block_height={} last_indexed_block_height={} depth={} min_depth={} leeway={}",
last_chain_block_height, last_indexed_height, depth, chain_index_min_depth, chain_index_depth_leeway,
);
return Ok(0);
}
}
let pending_index_empty = self.pending_index.highest_indexed_block()?.is_none();
let mut pending_index_mutations = Vec::new();
let mut new_highest_block_offset: Option<BlockOffset> = None;
let mut affected_operations_ref = affected_operations;
let operations = self.chain_handle.get_chain_operations(offset_from);
let chain_index_mutations = operations
.flat_map(|operation| {
if let Some(affected_operations) = affected_operations_ref.as_mut() {
affected_operations.push(operation.operation_id);
}
if let EngineOperationStatus::Committed(offset, height) = operation.status {
Some((offset, height, operation))
} else {
None
}
})
.filter(|(offset, height, _engine_operation)| {
*offset > offset_from.unwrap_or(0)
&& last_chain_block_height.saturating_sub(*height) >= chain_index_min_depth
})
.flat_map(|(offset, _height, engine_operation)| {
let operation_id = engine_operation.operation_id;
let (index_ops, entity_id) =
IndexOperation::from_chain_engine_operation(engine_operation, offset);
if !pending_index_empty {
pending_index_mutations.push(IndexOperation::DeleteEntityOperation(
entity_id,
operation_id,
));
}
if Some(offset) > new_highest_block_offset {
new_highest_block_offset = Some(offset);
}
index_ops
});
self.chain_index.apply_operations(chain_index_mutations)?;
let index_operations_count = pending_index_mutations.len();
if index_operations_count > 0 {
info!(
"Indexed in chain, and deleted from pending {} operations. New chain index last offset is {:?}.",
index_operations_count,
new_highest_block_offset
);
self.pending_index
.apply_operations(pending_index_mutations.into_iter())?;
if let Some(new_highest_block_offset) = new_highest_block_offset {
self.chain_index_last_block = Some(new_highest_block_offset);
}
}
Ok(index_operations_count)
}
fn last_chain_indexed_block(&self) -> Result<Option<(BlockOffset, BlockHeight)>, Error> {
let mut last_indexed_offset = self.chain_index_last_block;
if last_indexed_offset.is_none() {
last_indexed_offset = self.chain_index.highest_indexed_block()?;
}
match last_indexed_offset {
Some(offset) => {
let block_info = self.chain_handle.get_chain_block_info(offset)?;
Ok(block_info)
}
None => Ok(None),
}
}
fn handle_chain_pending_operations<O>(&mut self, operations_id: O) -> Result<usize, Error>
where
O: Iterator<Item = OperationId>,
{
#![allow(clippy::needless_collect)] let mutations = operations_id
.flat_map(|op_id| match self.chain_handle.get_pending_operation(op_id) {
Ok(Some(op)) => IndexOperation::from_pending_engine_operation(op),
Ok(None) => {
error!(
"An event from chain layer contained a pending operation that wasn't found: operation_id={}",
op_id
);
smallvec![]
}
Err(err) => { error!(
"An event from chain layer contained that couldn't be fetched from pending operation: {}",
err
);
smallvec![]
}
})
.collect::<Vec<_>>();
self.pending_index.apply_operations(mutations.into_iter())
}
#[cfg(test)]
fn fetch_entity(&self, entity_id: &str) -> Result<Entity, Error> {
let mutations = self.fetch_entity_mutations_metadata(entity_id)?;
let traits = self.fetch_entity_traits(&mutations, false);
Ok(Entity {
id: entity_id.to_string(),
traits,
creation_date: mutations.creation_date.map(|t| t.to_proto_timestamp()),
modification_date: mutations.modification_date.map(|t| t.to_proto_timestamp()),
deletion_date: mutations.deletion_date.map(|t| t.to_proto_timestamp()),
last_operation_id: mutations.last_operation_id,
})
}
pub fn fetch_entity_mutations_metadata(
&self,
entity_id: &str,
) -> Result<EntityAggregator, Error> {
let pending_results = self.pending_index.fetch_entity_mutations(entity_id)?;
let chain_results = self.chain_index.fetch_entity_mutations(entity_id)?;
let mutations_metadata = pending_results
.mutations
.iter()
.chain(chain_results.mutations.iter())
.cloned();
EntityAggregator::new(mutations_metadata)
}
fn populate_results_traits(
&self,
entity_results: &mut Vec<EntityResult>,
include_deleted: bool,
) {
for entity_result in entity_results {
if entity_result.mutations.should_collect() {
self.gc.maybe_flag_for_collection(&entity_result.mutations);
}
let traits = self.fetch_entity_traits(&entity_result.mutations, include_deleted);
if let Some(entity) = entity_result.proto.entity.as_mut() {
entity.traits = traits;
}
}
}
fn fetch_entity_traits(
&self,
entity_mutations: &EntityAggregator,
include_deleted: bool,
) -> Vec<Trait> {
entity_mutations
.traits
.iter()
.filter_map(|(trait_id, agg)| {
if let Some(projection) = &agg.projection {
if projection.skip {
return None;
}
}
if agg.deletion_date.is_some() && !include_deleted {
return None;
}
let (mut_metadata, _put_mut_metadata) = agg.last_put_mutation()?;
let mutation = self.fetch_chain_mutation_operation(
mut_metadata.operation_id,
mut_metadata.block_offset,
);
let mutation = match mutation {
Ok(Some(mutation)) => mutation,
other => {
error!(
"Couldn't fetch operation_id={} for entity_id={}: {:?}",
mut_metadata.operation_id, mut_metadata.entity_id, other
);
return None;
}
};
if mutation.entity_id != entity_mutations.entity_id {
error!(
"Fetched from chain operation {} that didn't belong to entity {}, but entity {}",
mut_metadata.operation_id, entity_mutations.entity_id, mutation.entity_id
);
return None;
}
let mut trt = match mutation.mutation? {
Mutation::PutTrait(put_mut) => put_mut.r#trait,
Mutation::DeleteTrait(_)
| Mutation::DeleteEntity(_)
| Mutation::DeleteOperations(_)
| Mutation::Test(_) => return None,
}?;
if let Some(projection) = &agg.projection {
let res = project_trait_fields(
self.full_cell.cell().schemas().as_ref(),
&mut trt,
projection.as_ref(),
);
if let Err(err) = res {
error!(
"Couldn't run projection on trait_id={} of entity_id={}: {:?}",
trait_id, mut_metadata.entity_id, err,
);
}
}
trt.creation_date = agg.creation_date.map(|d| d.to_proto_timestamp());
trt.modification_date = agg.modification_date.map(|d| d.to_proto_timestamp());
trt.deletion_date = agg.deletion_date.map(|d| d.to_proto_timestamp());
trt.last_operation_id = agg.last_operation_id.unwrap_or_default();
Some(trt)
})
.collect()
}
fn fetch_chain_mutation_operation(
&self,
operation_id: OperationId,
block_offset: Option<BlockOffset>,
) -> Result<Option<EntityMutation>, Error> {
let operation = if let Some(block_offset) = block_offset {
self.chain_handle
.get_chain_operation(block_offset, operation_id)?
} else {
self.chain_handle.get_operation(operation_id)?
};
let operation = if let Some(operation) = operation {
operation
} else {
return Ok(None);
};
if let Ok(data) = operation.as_entry_data() {
let mutation = EntityMutation::decode(data)?;
Ok(Some(mutation))
} else {
Ok(None)
}
}
}
pub struct EntityResult {
pub matched_mutation: MutationMetadata,
pub ordering_value: OrderingValueWrapper,
pub proto: EntityResultProto,
pub mutations: Rc<EntityAggregator>,
}