raphtory 0.17.0

raphtory, a temporal graph library
Documentation
use crate::{
    db::api::view::IndexSpec,
    errors::GraphError,
    prelude::GraphViewOps,
    search::{
        fields, get_props, new_index, property_index::PropertyIndex, register_default_tokenizers,
    },
};
use parking_lot::RwLock;
use raphtory_api::core::entities::{
    properties::{meta::Meta, prop::PropType, tprop::TPropOps},
    LayerIds, EID, VID,
};
use raphtory_storage::{
    core_ops::CoreGraphOps,
    graph::{
        edges::edge_storage_ops::EdgeStorageOps, graph::GraphStorage,
        nodes::node_storage_ops::NodeStorageOps,
    },
};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use std::{
    borrow::Borrow,
    path::{Path, PathBuf},
    sync::Arc,
};
use tantivy::{
    schema::{Schema, SchemaBuilder, FAST, INDEXED, STORED},
    Index,
};

#[derive(Clone)]
pub struct EntityIndex {
    pub(crate) index: Arc<Index>,
    pub(crate) metadata_indexes: Arc<RwLock<Vec<Option<PropertyIndex>>>>,
    pub(crate) temporal_property_indexes: Arc<RwLock<Vec<Option<PropertyIndex>>>>,
}

impl EntityIndex {
    pub(crate) fn new(schema: Schema, path: &Option<PathBuf>) -> Result<Self, GraphError> {
        let path = path.as_ref().map(|p| p.join("fields"));
        let index = new_index(schema, &path)?;
        Ok(Self {
            index: Arc::new(index),
            metadata_indexes: Arc::new(RwLock::new(Vec::new())),
            temporal_property_indexes: Arc::new(RwLock::new(Vec::new())),
        })
    }

    fn load_from_path(path: &Path, is_edge: bool) -> Result<Self, GraphError> {
        let index = Index::open_in_dir(path.join("fields"))?;

        register_default_tokenizers(&index);

        let metadata_indexes = PropertyIndex::load_all(&path.join("const_properties"), is_edge)?;
        let temporal_property_indexes =
            PropertyIndex::load_all(&path.join("temporal_properties"), is_edge)?;

        Ok(Self {
            index: Arc::new(index),
            metadata_indexes: Arc::new(RwLock::new(metadata_indexes)),
            temporal_property_indexes: Arc::new(RwLock::new(temporal_property_indexes)),
        })
    }

    pub(crate) fn load_nodes_index_from_path(path: &PathBuf) -> Result<Self, GraphError> {
        EntityIndex::load_from_path(path, false)
    }

    pub(crate) fn load_edges_index_from_path(path: &PathBuf) -> Result<Self, GraphError> {
        EntityIndex::load_from_path(path, true)
    }

    fn init_prop_indexes(
        &self,
        indexes: &Arc<RwLock<Vec<Option<PropertyIndex>>>>,
        prop_id: usize,
        prop_name: String,
        prop_type: PropType,
        index_path: Option<PathBuf>,
        add_schema_fields: fn(&mut SchemaBuilder),
        new_property: fn(Schema, &Option<PathBuf>) -> Result<PropertyIndex, GraphError>,
    ) -> Result<(), GraphError> {
        let mut indexes = indexes.write();
        // Resize the vector if needed
        if prop_id >= indexes.len() {
            indexes.resize(prop_id + 1, None);
        }
        // Create a new PropertyIndex if it doesn't exist
        if indexes[prop_id].is_none() {
            let mut schema_builder = PropertyIndex::schema_builder(&*prop_name, prop_type.clone());
            add_schema_fields(&mut schema_builder);
            let schema = schema_builder.build();
            let prop_index_path = index_path.as_deref().map(|p| p.join(prop_id.to_string()));
            let property_index = new_property(schema, &prop_index_path)?;
            indexes[prop_id] = Some(property_index);
        }
        Ok(())
    }

    pub(crate) fn index_node_metadata(
        &self,
        graph: &GraphStorage,
        index_spec: &IndexSpec,
        path: &Option<PathBuf>,
    ) -> Result<(), GraphError> {
        let props = &index_spec.node_metadata;
        let meta = graph.node_meta().metadata_mapper();
        for (prop_name, prop_id, prop_type) in get_props(props, meta) {
            self.init_prop_indexes(
                &self.metadata_indexes,
                prop_id,
                prop_name,
                prop_type,
                path.as_deref().map(|p| p.join("const_properties")),
                |schema| {
                    schema.add_u64_field(fields::NODE_ID, INDEXED | FAST | STORED);
                },
                PropertyIndex::new_node_property,
            )?;

            let indexes = self.metadata_indexes.read_recursive();
            if let Some(prop_index) = &indexes[prop_id] {
                let mut writer = prop_index.index.writer(50_000_000)?;
                (0..graph.count_nodes())
                    .into_par_iter()
                    .try_for_each(|v_id| {
                        let node = graph.core_node(VID(v_id));
                        if let Some(prop_value) = node.prop(prop_id) {
                            let prop_doc = prop_index
                                .create_node_metadata_document(v_id as u64, &prop_value)?;
                            writer.add_document(prop_doc)?;
                        }
                        Ok::<(), GraphError>(())
                    })?;

                writer.commit()?;
            }
        }
        Ok(())
    }

    pub(crate) fn index_node_temporal_props(
        &self,
        graph: &GraphStorage,
        index_spec: &IndexSpec,
        path: &Option<PathBuf>,
    ) -> Result<(), GraphError> {
        let props = &index_spec.node_properties;
        let meta = graph.node_meta().temporal_prop_mapper();
        for (prop_name, prop_id, prop_type) in get_props(props, meta) {
            self.init_prop_indexes(
                &self.temporal_property_indexes,
                prop_id,
                prop_name,
                prop_type,
                path.as_deref().map(|p| p.join("temporal_properties")),
                |schema| {
                    schema.add_i64_field(fields::TIME, INDEXED | FAST | STORED);
                    schema.add_u64_field(fields::EVENT_ID, INDEXED | FAST | STORED);
                    schema.add_u64_field(fields::NODE_ID, INDEXED | FAST | STORED);
                },
                PropertyIndex::new_node_property,
            )?;

            let indexes = self.temporal_property_indexes.read_recursive();
            if let Some(prop_index) = &indexes[prop_id] {
                let mut writer = prop_index.index.writer(50_000_000)?;
                (0..graph.count_nodes())
                    .into_par_iter()
                    .try_for_each(|v_id| {
                        let node = graph.core_node(VID(v_id));
                        for (t, prop_value) in node.tprop(prop_id).iter() {
                            let prop_doc = prop_index.create_node_temporal_property_document(
                                t.into(),
                                v_id as u64,
                                &prop_value,
                            )?;
                            writer.add_document(prop_doc)?;
                        }

                        Ok::<(), GraphError>(())
                    })?;

                writer.commit()?;
            }
        }
        Ok(())
    }

    pub(crate) fn index_edge_metadata(
        &self,
        graph: &GraphStorage,
        index_spec: &IndexSpec,
        path: &Option<PathBuf>,
    ) -> Result<(), GraphError> {
        let props = &index_spec.edge_metadata;
        let meta = graph.edge_meta().metadata_mapper();
        for (prop_name, prop_id, prop_type) in get_props(props, meta) {
            self.init_prop_indexes(
                &self.metadata_indexes,
                prop_id,
                prop_name,
                prop_type,
                path.as_deref().map(|p| p.join("const_properties")),
                |schema| {
                    schema.add_u64_field(fields::EDGE_ID, INDEXED | FAST | STORED);
                    schema.add_u64_field(fields::LAYER_ID, INDEXED | FAST | STORED);
                },
                PropertyIndex::new_edge_property,
            )?;

            let indexes = self.metadata_indexes.read_recursive();
            if let Some(prop_index) = &indexes[prop_id] {
                let mut writer = prop_index.index.writer(50_000_000)?;
                (0..graph.count_edges())
                    .into_par_iter()
                    .try_for_each(|e_id| {
                        let edge = graph.core_edge(EID(e_id));
                        for (layer_id, prop_value) in edge.metadata_iter(&LayerIds::All, prop_id) {
                            let prop_doc = prop_index.create_edge_metadata_document(
                                e_id as u64,
                                layer_id,
                                prop_value.borrow(),
                            )?;
                            writer.add_document(prop_doc)?;
                        }
                        Ok::<(), GraphError>(())
                    })?;
                writer.commit()?;
            }
        }
        Ok(())
    }

    pub(crate) fn index_edge_temporal_props(
        &self,
        graph: &GraphStorage,
        index_spec: &IndexSpec,
        path: &Option<PathBuf>,
    ) -> Result<(), GraphError> {
        let props = &index_spec.edge_properties;
        let meta = graph.edge_meta().temporal_prop_mapper();
        for (prop_name, prop_id, prop_type) in get_props(props, meta) {
            self.init_prop_indexes(
                &self.temporal_property_indexes,
                prop_id,
                prop_name,
                prop_type,
                path.as_deref().map(|p| p.join("temporal_properties")),
                |schema| {
                    schema.add_i64_field(fields::TIME, INDEXED | FAST | STORED);
                    schema.add_u64_field(fields::EVENT_ID, INDEXED | FAST | STORED);
                    schema.add_u64_field(fields::EDGE_ID, INDEXED | FAST | STORED);
                    schema.add_u64_field(fields::LAYER_ID, INDEXED | FAST | STORED);
                },
                PropertyIndex::new_edge_property,
            )?;

            let indexes = self.temporal_property_indexes.read_recursive();
            if let Some(prop_index) = &indexes[prop_id] {
                let mut writer = prop_index.index.writer(50_000_000)?;
                (0..graph.count_edges())
                    .into_par_iter()
                    .try_for_each(|e_id| {
                        let edge = graph.core_edge(EID(e_id));
                        for (layer_id, prop_value) in
                            edge.temporal_prop_iter(&LayerIds::All, prop_id)
                        {
                            for (t, prop_value) in prop_value.iter() {
                                let prop_doc = prop_index.create_edge_temporal_property_document(
                                    t,
                                    e_id as u64,
                                    layer_id,
                                    &prop_value,
                                )?;
                                writer.add_document(prop_doc)?;
                            }
                        }
                        Ok::<(), GraphError>(())
                    })?;
                writer.commit()?;
            }
        }
        Ok(())
    }

    fn fetch_property_index(
        &self,
        indexes: &Arc<RwLock<Vec<Option<PropertyIndex>>>>,
        prop_id: Option<usize>,
    ) -> Option<(Arc<PropertyIndex>, usize)> {
        prop_id.and_then(|id| {
            indexes
                .read_recursive()
                .get(id)
                .and_then(|opt| opt.as_ref())
                .cloned()
                .map(Arc::from)
                .map(|index| (index, id))
        })
    }

    pub(crate) fn get_metadata_index(
        &self,
        meta: &Meta,
        prop_name: &str,
    ) -> Result<Option<(Arc<PropertyIndex>, usize)>, GraphError> {
        Ok(self.fetch_property_index(
            &self.metadata_indexes,
            meta.metadata_mapper().get_id(prop_name),
        ))
    }

    pub(crate) fn get_temporal_property_index(
        &self,
        meta: &Meta,
        prop_name: &str,
    ) -> Result<Option<(Arc<PropertyIndex>, usize)>, GraphError> {
        Ok(self.fetch_property_index(
            &self.temporal_property_indexes,
            meta.temporal_prop_mapper().get_id(prop_name),
        ))
    }
}