raphtory 0.17.0

raphtory, a temporal graph library
Documentation
use crate::{
    errors::{into_graph_err, GraphError, LoadError},
    io::arrow::dataframe::DFChunk,
    prelude::AdditionOps,
};
use arrow::array::{Array, AsArray, LargeStringArray, StringArray, StringViewArray};
use iter_enum::{
    DoubleEndedIterator, ExactSizeIterator, IndexedParallelIterator, Iterator, ParallelIterator,
};
use rayon::prelude::*;

#[derive(Copy, Clone)]
pub(crate) enum LayerCol<'a> {
    Name { name: Option<&'a str>, len: usize },
    Utf8 { col: &'a StringArray },
    LargeUtf8 { col: &'a LargeStringArray },
    Utf8View { col: &'a StringViewArray },
}

#[derive(
    Iterator, DoubleEndedIterator, ExactSizeIterator, ParallelIterator, IndexedParallelIterator,
)]
pub enum LayerColVariants<Name, Utf8, LargeUtf8, Utf8View> {
    Name(Name),
    Utf8(Utf8),
    LargeUtf8(LargeUtf8),
    Utf8View(Utf8View),
}

impl<'a> LayerCol<'a> {
    pub fn par_iter(self) -> impl IndexedParallelIterator<Item = Option<&'a str>> {
        match self {
            LayerCol::Name { name, len } => {
                LayerColVariants::Name((0..len).into_par_iter().map(move |_| name))
            }
            LayerCol::Utf8 { col } => LayerColVariants::Utf8(
                (0..col.len())
                    .into_par_iter()
                    .map(|i| col.is_valid(i).then(|| col.value(i))),
            ),
            LayerCol::LargeUtf8 { col } => LayerColVariants::LargeUtf8(
                (0..col.len())
                    .into_par_iter()
                    .map(|i| col.is_valid(i).then(|| col.value(i))),
            ),

            LayerCol::Utf8View { col } => LayerColVariants::Utf8View(
                (0..col.len())
                    .into_par_iter()
                    .map(|i| col.is_valid(i).then(|| col.value(i))),
            ),
        }
    }

    #[allow(dead_code)]
    pub fn iter(self) -> impl Iterator<Item = Option<&'a str>> {
        match self {
            LayerCol::Name { name, len } => LayerColVariants::Name((0..len).map(move |_| name)),
            LayerCol::Utf8 { col } => LayerColVariants::Utf8(col.iter()),
            LayerCol::LargeUtf8 { col } => LayerColVariants::LargeUtf8(col.iter()),
            LayerCol::Utf8View { col } => LayerColVariants::Utf8View(col.iter()),
        }
    }

    pub fn resolve(
        self,
        graph: &(impl AdditionOps + Send + Sync),
    ) -> Result<Vec<usize>, GraphError> {
        match self {
            LayerCol::Name { name, len } => {
                let layer = graph.resolve_layer(name).map_err(into_graph_err)?.inner();
                Ok(vec![layer; len])
            }
            col => {
                let iter = col.par_iter();
                let mut res = vec![0usize; iter.len()];
                iter.zip(res.par_iter_mut())
                    .try_for_each(|(layer, entry)| {
                        let layer = graph.resolve_layer(layer).map_err(into_graph_err)?.inner();
                        *entry = layer;
                        Ok::<(), GraphError>(())
                    })?;
                Ok(res)
            }
        }
    }
}

pub(crate) fn lift_layer_col<'a>(
    layer_name: Option<&'a str>,
    layer_index: Option<usize>,
    df: &'a DFChunk,
) -> Result<LayerCol<'a>, GraphError> {
    match (layer_name, layer_index) {
        (name, None) => Ok(LayerCol::Name {
            name,
            len: df.len(),
        }),
        (None, Some(layer_index)) => {
            let col = &df.chunk[layer_index];
            if let Some(col) = col.as_string_opt() {
                Ok(LayerCol::Utf8 { col })
            } else if let Some(col) = col.as_string_opt() {
                Ok(LayerCol::LargeUtf8 { col })
            } else if let Some(col) = col.as_string_view_opt() {
                Ok(LayerCol::Utf8View { col })
            } else {
                Err(LoadError::InvalidLayerType(col.data_type().clone()).into())
            }
        }
        _ => Err(GraphError::WrongNumOfArgs(
            "layer_name".to_string(),
            "layer_col".to_string(),
        )),
    }
}

pub(crate) fn lift_node_type_col<'a>(
    node_type_name: Option<&'a str>,
    node_type_index: Option<usize>,
    df: &'a DFChunk,
) -> Result<LayerCol<'a>, GraphError> {
    match (node_type_name, node_type_index) {
        (name, None) => Ok(LayerCol::Name {
            name,
            len: df.len(),
        }),
        (None, Some(layer_index)) => {
            let col = &df.chunk[layer_index];
            if let Some(col) = col.as_string_opt() {
                Ok(LayerCol::Utf8 { col })
            } else if let Some(col) = col.as_string_opt() {
                Ok(LayerCol::LargeUtf8 { col })
            } else if let Some(col) = col.as_string_view_opt() {
                Ok(LayerCol::Utf8View { col })
            } else {
                Err(LoadError::InvalidNodeType(col.data_type().clone()).into())
            }
        }
        _ => Err(GraphError::WrongNumOfArgs(
            "node_type_name".to_string(),
            "node_type_col".to_string(),
        )),
    }
}