raphtory 0.17.0

raphtory, a temporal graph library
Documentation
use crate::{
    core::{entities::EID, storage::timeindex::EventTime},
    db::{api::view::IndexSpec, graph::edge::EdgeView},
    errors::GraphError,
    prelude::*,
    search::{
        entity_index::EntityIndex,
        fields::{DESTINATION, DESTINATION_TOKENIZED, EDGE_ID, SOURCE, SOURCE_TOKENIZED},
        get_reader, indexed_props, resolve_props, TOKENIZER,
    },
};
use ahash::HashSet;
use raphtory_api::core::storage::dict_mapper::MaybeNew;
use raphtory_storage::{
    core_ops::CoreGraphOps,
    graph::{edges::edge_storage_ops::EdgeStorageOps, graph::GraphStorage},
};
use rayon::{iter::IntoParallelIterator, prelude::ParallelIterator};
use std::{
    fmt::{Debug, Formatter},
    path::PathBuf,
};
use tantivy::{
    collector::TopDocs,
    query::AllQuery,
    schema::{
        Field, IndexRecordOption, Schema, SchemaBuilder, TextFieldIndexing, TextOptions, FAST,
        INDEXED, STORED, STRING,
    },
    Document, IndexWriter, TantivyDocument, Term,
};

#[derive(Clone)]
pub struct EdgeIndex {
    pub(crate) entity_index: EntityIndex,
    pub(crate) edge_id_field: Field,
    pub(crate) src_field: Field,
    pub(crate) src_tokenized_field: Field,
    pub(crate) dst_field: Field,
    pub(crate) dst_tokenized_field: Field,
}

impl Debug for EdgeIndex {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("EdgeIndex")
            .field("index", &self.entity_index.index)
            .finish()
    }
}

impl EdgeIndex {
    fn fetch_fields(schema: &Schema) -> Result<(Field, Field, Field, Field, Field), GraphError> {
        let edge_id_field = schema
            .get_field(EDGE_ID)
            .map_err(|_| GraphError::IndexErrorMsg("Edge ID field missing in schema.".into()))?;

        let src_field = schema
            .get_field(SOURCE)
            .map_err(|_| GraphError::IndexErrorMsg("Source field missing in schema.".into()))?;

        let src_tokenized_field = schema.get_field(SOURCE_TOKENIZED).map_err(|_| {
            GraphError::IndexErrorMsg("Tokenized source field missing in schema.".into())
        })?;

        let dst_field = schema.get_field(DESTINATION).map_err(|_| {
            GraphError::IndexErrorMsg("Destination field missing in schema.".into())
        })?;

        let dst_tokenized_field = schema.get_field(DESTINATION_TOKENIZED).map_err(|_| {
            GraphError::IndexErrorMsg("Tokenized destination field missing in schema.".into())
        })?;

        Ok((
            edge_id_field,
            src_field,
            src_tokenized_field,
            dst_field,
            dst_tokenized_field,
        ))
    }

    pub(crate) fn new(path: &Option<PathBuf>) -> Result<Self, GraphError> {
        let schema = Self::schema_builder().build();
        let (edge_id_field, src_field, src_tokenized_field, dst_field, dst_tokenized_field) =
            Self::fetch_fields(&schema)?;

        let entity_index = EntityIndex::new(schema, path)?;

        Ok(Self {
            entity_index,
            edge_id_field,
            src_field,
            src_tokenized_field,
            dst_field,
            dst_tokenized_field,
        })
    }

    pub(crate) fn load_from_path(path: &PathBuf) -> Result<Self, GraphError> {
        let entity_index = EntityIndex::load_edges_index_from_path(path)?;
        let schema = entity_index.index.schema();
        let (edge_id_field, src_field, src_tokenized_field, dst_field, dst_tokenized_field) =
            Self::fetch_fields(&schema)?;

        Ok(Self {
            entity_index,
            edge_id_field,
            src_field,
            src_tokenized_field,
            dst_field,
            dst_tokenized_field,
        })
    }

    pub(crate) fn resolve_metadata(&self) -> HashSet<usize> {
        let props = self.entity_index.metadata_indexes.read_recursive();
        resolve_props(&props)
    }

    pub(crate) fn resolve_properties(&self) -> HashSet<usize> {
        let props = self.entity_index.temporal_property_indexes.read_recursive();
        resolve_props(&props)
    }

    pub(crate) fn print(&self) -> Result<(), GraphError> {
        let searcher = get_reader(&self.entity_index.index)?.searcher();
        let top_docs = searcher.search(&AllQuery, &TopDocs::with_limit(1000))?;

        println!("Total edge doc count: {}", top_docs.len());

        for (_score, doc_address) in top_docs {
            let doc: TantivyDocument = searcher.doc(doc_address)?;
            println!("Edge doc: {:?}", doc.to_json(searcher.schema()));
        }

        let metadata_indexes = self.entity_index.metadata_indexes.read_recursive();
        for property_index in metadata_indexes.iter().flatten() {
            property_index.print()?;
        }

        let temporal_property_indexes =
            self.entity_index.temporal_property_indexes.read_recursive();
        for property_index in temporal_property_indexes.iter().flatten() {
            property_index.print()?;
        }

        Ok(())
    }

    fn schema_builder() -> SchemaBuilder {
        let mut schema_builder = Schema::builder();
        schema_builder.add_u64_field(EDGE_ID, INDEXED | FAST | STORED);
        schema_builder.add_text_field(SOURCE, STRING);
        schema_builder.add_text_field(
            SOURCE_TOKENIZED,
            TextOptions::default().set_indexing_options(
                TextFieldIndexing::default()
                    .set_tokenizer(TOKENIZER)
                    .set_index_option(IndexRecordOption::WithFreqsAndPositions),
            ),
        );
        schema_builder.add_text_field(DESTINATION, STRING);
        schema_builder.add_text_field(
            DESTINATION_TOKENIZED,
            TextOptions::default().set_indexing_options(
                TextFieldIndexing::default()
                    .set_tokenizer(TOKENIZER)
                    .set_index_option(IndexRecordOption::WithFreqsAndPositions),
            ),
        );
        schema_builder
    }

    pub fn get_edge_field(&self, field_name: &str) -> tantivy::Result<Field> {
        self.entity_index.index.schema().get_field(field_name)
    }

    pub fn get_tokenized_edge_field(&self, field_name: &str) -> tantivy::Result<Field> {
        self.entity_index
            .index
            .schema()
            .get_field(format!("{field_name}_tokenized").as_ref())
    }

    fn create_document(&self, edge_id: u64, src: String, dst: String) -> TantivyDocument {
        let mut document = TantivyDocument::new();
        document.add_u64(self.edge_id_field, edge_id);
        document.add_text(self.src_field, src.clone());
        document.add_text(self.src_tokenized_field, src);
        document.add_text(self.dst_field, dst.clone());
        document.add_text(self.dst_tokenized_field, dst);
        document
    }

    fn index_edge<'graph, G: GraphViewOps<'graph>>(
        &self,
        edge: EdgeView<G>,
        writer: &IndexWriter,
    ) -> Result<(), GraphError> {
        let edge_id = edge.edge.pid().as_u64();
        let src = edge.src().name();
        let dst = edge.dst().name();

        let edge_doc = self.create_document(edge_id, src, dst);
        writer.add_document(edge_doc)?;

        Ok(())
    }

    pub(crate) fn index_edges_fields(&self, graph: &GraphStorage) -> Result<(), GraphError> {
        let mut writer = self.entity_index.index.writer(100_000_000)?;
        (0..graph.count_edges())
            .into_par_iter()
            .try_for_each(|e_id| {
                let edge = graph.core_edge(EID(e_id));
                let e_view = EdgeView::new(graph, edge.out_ref());
                self.index_edge(e_view, &writer)?;
                Ok::<(), GraphError>(())
            })?;
        writer.commit()?;
        Ok(())
    }

    pub(crate) fn index_edges_props(
        &self,
        graph: &GraphStorage,
        path: Option<PathBuf>,
        index_spec: &IndexSpec,
    ) -> Result<(), GraphError> {
        self.entity_index
            .index_edge_metadata(graph, index_spec, &path)?;
        self.entity_index
            .index_edge_temporal_props(graph, index_spec, &path)?;
        Ok(())
    }

    pub(crate) fn add_edge_update(
        &self,
        graph: &GraphStorage,
        edge_id: MaybeNew<EID>,
        t: EventTime,
        layer_id: usize,
        props: &[(usize, Prop)],
    ) -> Result<(), GraphError> {
        let eid_u64 = edge_id.inner().as_u64();

        // Check if the edge document is already in the index,
        // if it does skip adding a new doc for same edge
        edge_id
            .if_new(|eid| {
                let mut writer = self.entity_index.index.writer(100_000_000)?;
                let ese = graph.core_edge(eid);
                let src = graph.node_name(ese.src());
                let dst = graph.node_name(ese.dst());
                let edge_doc = self.create_document(eid_u64, src, dst);
                writer.add_document(edge_doc)?;
                writer.commit()?;
                Ok::<(), GraphError>(())
            })
            .transpose()?;

        let indexes = self.entity_index.temporal_property_indexes.read_recursive();
        for (prop_id, prop_value) in indexed_props(props, &indexes) {
            if let Some(index) = &indexes[prop_id] {
                let mut writer = index.index.writer(50_000_000)?;
                let prop_doc = index.create_edge_temporal_property_document(
                    t,
                    eid_u64,
                    layer_id,
                    &prop_value,
                )?;
                writer.add_document(prop_doc)?;
                writer.commit()?;
            }
        }
        Ok(())
    }

    pub(crate) fn add_edge_metadata(
        &self,
        edge_id: EID,
        layer_id: usize,
        props: &[(usize, Prop)],
    ) -> Result<(), GraphError> {
        let indexes = self.entity_index.metadata_indexes.read_recursive();
        for (prop_id, prop_value) in indexed_props(props, &indexes) {
            if let Some(index) = &indexes[prop_id] {
                let prop_doc =
                    index.create_edge_metadata_document(edge_id.as_u64(), layer_id, &prop_value)?;
                let mut writer = index.index.writer(50_000_000)?;
                writer.add_document(prop_doc)?;
                writer.commit()?;
            }
        }
        Ok(())
    }

    pub(crate) fn update_edge_metadata(
        &self,
        edge_id: EID,
        layer_id: usize,
        props: &[(usize, Prop)],
    ) -> Result<(), GraphError> {
        let indexes = self.entity_index.metadata_indexes.read_recursive();
        for (prop_id, prop_value) in indexed_props(props, &indexes) {
            if let Some(index) = &indexes[prop_id] {
                let mut writer = index.index.writer(50_000_000)?;
                // Delete existing metadata document
                let term = Term::from_field_u64(index.entity_id_field, edge_id.as_u64());
                writer.delete_term(term);
                // Reindex metadata
                let prop_doc =
                    index.create_edge_metadata_document(edge_id.as_u64(), layer_id, &prop_value)?;
                writer.add_document(prop_doc)?;
                writer.commit()?;
            }
        }
        Ok(())
    }
}