raphtory 0.17.0

raphtory, a temporal graph library
Documentation
use crate::{
    core::utils::iter::GenLockedIter,
    db::graph::node::NodeView,
    errors::GraphError,
    serialise::parquet::{
        model::{ParquetCNode, ParquetTNode},
        run_encode, NODES_C_PATH, NODES_T_PATH, NODE_ID, TIME_COL, TYPE_COL,
    },
};
use arrow::datatypes::{DataType, Field};
use itertools::Itertools;
use raphtory_api::{core::entities::VID, iter::IntoDynBoxed};
use raphtory_storage::graph::graph::GraphStorage;
use std::path::Path;

pub(crate) fn encode_nodes_tprop(
    g: &GraphStorage,
    path: impl AsRef<Path>,
) -> Result<(), GraphError> {
    run_encode(
        g,
        g.node_meta().temporal_prop_mapper(),
        g.unfiltered_num_nodes(),
        path,
        NODES_T_PATH,
        |id_type| {
            vec![
                Field::new(NODE_ID, id_type.clone(), false),
                Field::new(TIME_COL, DataType::Int64, false),
                Field::new(TYPE_COL, DataType::Utf8, true),
            ]
        },
        |nodes, g, decoder, writer| {
            let row_group_size = 100_000;

            let cols = g
                .node_meta()
                .temporal_prop_mapper()
                .get_keys()
                .into_iter()
                .collect_vec();
            let cols = &cols;
            for node_rows in nodes
                .into_iter()
                .map(VID)
                .map(|vid| NodeView::new_internal(g, vid))
                .flat_map(move |node| {
                    GenLockedIter::from(node, |node| {
                        node.rows()
                            .map(|(t, props)| ParquetTNode {
                                node: *node,
                                cols,
                                t,
                                props,
                            })
                            .into_dyn_boxed()
                    })
                })
                .chunks(row_group_size)
                .into_iter()
                .map(|chunk| chunk.collect_vec())
            {
                decoder.serialize(&node_rows)?;
                if let Some(rb) = decoder.flush()? {
                    writer.write(&rb)?;
                    writer.flush()?;
                }
            }
            Ok(())
        },
    )
}

pub(crate) fn encode_nodes_cprop(
    g: &GraphStorage,
    path: impl AsRef<Path>,
) -> Result<(), GraphError> {
    run_encode(
        g,
        g.node_meta().metadata_mapper(),
        g.unfiltered_num_nodes(),
        path,
        NODES_C_PATH,
        |id_type| {
            vec![
                Field::new(NODE_ID, id_type.clone(), false),
                Field::new(TYPE_COL, DataType::Utf8, true),
            ]
        },
        |nodes, g, decoder, writer| {
            let row_group_size = 100_000;

            for node_rows in nodes
                .into_iter()
                .map(VID)
                .map(|vid| NodeView::new_internal(g, vid))
                .map(move |node| ParquetCNode { node })
                .chunks(row_group_size)
                .into_iter()
                .map(|chunk| chunk.collect_vec())
            {
                decoder.serialize(&node_rows)?;
                if let Some(rb) = decoder.flush()? {
                    writer.write(&rb)?;
                    writer.flush()?;
                }
            }
            Ok(())
        },
    )
}