use crate::{
core::{
entities::VID,
storage::timeindex::{AsTime, EventTime},
},
db::{api::view::IndexSpec, graph::node::NodeView},
errors::GraphError,
prelude::*,
search::{
entity_index::EntityIndex,
fields::{NODE_ID, NODE_NAME, NODE_NAME_TOKENIZED, NODE_TYPE, NODE_TYPE_TOKENIZED},
get_reader, indexed_props, resolve_props, TOKENIZER,
},
};
use ahash::HashSet;
use raphtory_api::core::storage::{arc_str::ArcStr, dict_mapper::MaybeNew};
use raphtory_storage::graph::graph::GraphStorage;
use rayon::{iter::IntoParallelIterator, prelude::ParallelIterator};
use std::{
fmt::{Debug, Formatter},
path::PathBuf,
};
use tantivy::{
collector::TopDocs,
query::AllQuery,
schema::{
Field, IndexRecordOption, Schema, SchemaBuilder, TextFieldIndexing, TextOptions, FAST,
INDEXED, STORED, STRING,
},
Document, IndexWriter, TantivyDocument, Term,
};
#[derive(Clone)]
pub struct NodeIndex {
pub(crate) entity_index: EntityIndex,
pub(crate) node_id_field: Field,
pub(crate) node_name_field: Field,
pub(crate) node_name_tokenized_field: Field,
pub(crate) node_type_field: Field,
pub(crate) node_type_tokenized_field: Field,
}
impl Debug for NodeIndex {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("NodeIndex")
.field("index", &self.entity_index.index)
.finish()
}
}
impl NodeIndex {
fn fetch_fields(schema: &Schema) -> Result<(Field, Field, Field, Field, Field), GraphError> {
let node_id_field = schema
.get_field(NODE_ID)
.map_err(|_| GraphError::IndexErrorMsg("Node ID field missing in schema.".into()))?;
let node_name_field = schema
.get_field(NODE_NAME)
.map_err(|_| GraphError::IndexErrorMsg("Node name field missing in schema.".into()))?;
let node_name_tokenized_field = schema.get_field(NODE_NAME_TOKENIZED).map_err(|_| {
GraphError::IndexErrorMsg("Tokenized node name field missing in schema.".into())
})?;
let node_type_field = schema
.get_field(NODE_TYPE)
.map_err(|_| GraphError::IndexErrorMsg("Node type field missing in schema.".into()))?;
let node_type_tokenized_field = schema.get_field(NODE_TYPE_TOKENIZED).map_err(|_| {
GraphError::IndexErrorMsg("Tokenized node type field missing in schema.".into())
})?;
Ok((
node_id_field,
node_name_field,
node_name_tokenized_field,
node_type_field,
node_type_tokenized_field,
))
}
pub(crate) fn new(path: &Option<PathBuf>) -> Result<Self, GraphError> {
let schema = Self::schema_builder().build();
let (
node_id_field,
node_name_field,
node_name_tokenized_field,
node_type_field,
node_type_tokenized_field,
) = Self::fetch_fields(&schema)?;
let entity_index = EntityIndex::new(schema, path)?;
Ok(Self {
entity_index,
node_id_field,
node_name_field,
node_name_tokenized_field,
node_type_field,
node_type_tokenized_field,
})
}
pub(crate) fn load_from_path(path: &PathBuf) -> Result<Self, GraphError> {
let entity_index = EntityIndex::load_nodes_index_from_path(path)?;
let schema = entity_index.index.schema();
let (
node_id_field,
node_name_field,
node_name_tokenized_field,
node_type_field,
node_type_tokenized_field,
) = Self::fetch_fields(&schema)?;
Ok(Self {
entity_index,
node_id_field,
node_name_field,
node_name_tokenized_field,
node_type_field,
node_type_tokenized_field,
})
}
pub(crate) fn resolve_metadata(&self) -> HashSet<usize> {
let props = self.entity_index.metadata_indexes.read_recursive();
resolve_props(&props)
}
pub(crate) fn resolve_properties(&self) -> HashSet<usize> {
let props = self.entity_index.temporal_property_indexes.read_recursive();
resolve_props(&props)
}
pub(crate) fn print(&self) -> Result<(), GraphError> {
let searcher = get_reader(&self.entity_index.index)?.searcher();
let top_docs = searcher.search(&AllQuery, &TopDocs::with_limit(1000))?;
println!("Total node doc count: {}", top_docs.len());
for (_score, doc_address) in top_docs {
let doc: TantivyDocument = searcher.doc(doc_address)?;
println!("Node doc: {:?}", doc.to_json(searcher.schema()));
}
let metadata_indexes = self.entity_index.metadata_indexes.read_recursive();
for property_index in metadata_indexes.iter().flatten() {
property_index.print()?;
}
let temporal_property_indexes =
self.entity_index.temporal_property_indexes.read_recursive();
for property_index in temporal_property_indexes.iter().flatten() {
property_index.print()?;
}
Ok(())
}
fn schema_builder() -> SchemaBuilder {
let mut schema_builder: SchemaBuilder = Schema::builder();
schema_builder.add_u64_field(NODE_ID, INDEXED | FAST | STORED);
schema_builder.add_text_field(NODE_NAME, STRING);
schema_builder.add_text_field(
NODE_NAME_TOKENIZED,
TextOptions::default().set_indexing_options(
TextFieldIndexing::default()
.set_tokenizer(TOKENIZER)
.set_index_option(IndexRecordOption::WithFreqsAndPositions),
),
);
schema_builder.add_text_field(NODE_TYPE, STRING);
schema_builder.add_text_field(
NODE_TYPE_TOKENIZED,
TextOptions::default().set_indexing_options(
TextFieldIndexing::default()
.set_tokenizer(TOKENIZER)
.set_index_option(IndexRecordOption::WithFreqsAndPositions),
),
);
schema_builder
}
pub fn get_node_field(&self, field_name: &str) -> tantivy::Result<Field> {
self.entity_index.index.schema().get_field(field_name)
}
pub fn get_tokenized_node_field(&self, field_name: &str) -> tantivy::Result<Field> {
self.entity_index
.index
.schema()
.get_field(format!("{field_name}_tokenized").as_ref())
}
fn create_document(
&self,
node_id: u64,
node_name: String,
node_type: Option<ArcStr>,
) -> TantivyDocument {
let mut document = TantivyDocument::new();
document.add_u64(self.node_id_field, node_id);
document.add_text(self.node_name_field, node_name.clone());
document.add_text(self.node_name_tokenized_field, node_name);
if let Some(node_type) = node_type {
document.add_text(self.node_type_field, node_type.clone());
document.add_text(self.node_type_tokenized_field, node_type);
}
document
}
fn index_node<'graph, G: GraphViewOps<'graph>>(
&self,
node: NodeView<'graph, G>,
writer: &IndexWriter,
) -> Result<(), GraphError> {
let node_id: u64 = usize::from(node.node) as u64;
let node_name = node.name();
let node_type = node.node_type();
let node_doc = self.create_document(node_id, node_name.clone(), node_type.clone());
writer.add_document(node_doc)?;
Ok(())
}
pub(crate) fn index_nodes_fields(&self, graph: &GraphStorage) -> Result<(), GraphError> {
let mut writer = self.entity_index.index.writer(100_000_000)?;
(0..graph.count_nodes())
.into_par_iter()
.try_for_each(|v_id| {
let node = NodeView::new_internal(graph, VID(v_id));
self.index_node(node, &writer)?;
Ok::<(), GraphError>(())
})?;
writer.commit()?;
Ok(())
}
pub(crate) fn index_nodes_props(
&self,
graph: &GraphStorage,
path: Option<PathBuf>,
index_spec: &IndexSpec,
) -> Result<(), GraphError> {
self.entity_index
.index_node_metadata(graph, index_spec, &path)?;
self.entity_index
.index_node_temporal_props(graph, index_spec, &path)?;
Ok(())
}
pub(crate) fn add_node_update(
&self,
graph: &GraphStorage,
t: EventTime,
node_id: MaybeNew<VID>,
props: &[(usize, Prop)],
) -> Result<(), GraphError> {
let node = graph
.node(VID(node_id.inner().as_u64() as usize))
.expect("Node for internal id should exist.")
.at(t.t());
let vid_u64 = node_id.inner().as_u64();
node_id
.if_new(|_| {
let mut writer = self.entity_index.index.writer(100_000_000)?;
let node_doc = self.create_document(vid_u64, node.name(), node.node_type());
writer.add_document(node_doc)?;
writer.commit()?;
Ok::<(), GraphError>(())
})
.transpose()?;
let indexes = self.entity_index.temporal_property_indexes.read_recursive();
for (prop_id, prop_value) in indexed_props(props, &indexes) {
if let Some(index) = &indexes[prop_id] {
let mut writer = index.index.writer(50_000_000)?;
let prop_doc =
index.create_node_temporal_property_document(t, vid_u64, &prop_value)?;
writer.add_document(prop_doc)?;
writer.commit()?;
}
}
Ok(())
}
pub(crate) fn add_node_metadata(
&self,
node_id: VID,
props: &[(usize, Prop)],
) -> Result<(), GraphError> {
let indexes = self.entity_index.metadata_indexes.read_recursive();
for (prop_id, prop_value) in indexed_props(props, &indexes) {
if let Some(index) = &indexes[prop_id] {
let prop_doc =
index.create_node_metadata_document(node_id.as_u64(), &prop_value)?;
let mut writer = index.index.writer(50_000_000)?;
writer.add_document(prop_doc)?;
writer.commit()?;
}
}
Ok(())
}
pub(crate) fn update_node_metadata(
&self,
node_id: VID,
props: &[(usize, Prop)],
) -> Result<(), GraphError> {
let indexes = self.entity_index.metadata_indexes.read_recursive();
for (prop_id, prop_value) in indexed_props(props, &indexes) {
if let Some(index) = &indexes[prop_id] {
let mut writer = index.index.writer(50_000_000)?;
let term = Term::from_field_u64(index.entity_id_field, node_id.as_u64());
writer.delete_term(term);
let prop_doc =
index.create_node_metadata_document(node_id.as_u64(), &prop_value)?;
writer.add_document(prop_doc)?;
writer.commit()?;
}
}
Ok(())
}
}