raphtory 0.17.0

raphtory, a temporal graph library
Documentation
use bigdecimal::BigDecimal;
use chrono::NaiveDateTime;
use raphtory_api::core::{
    entities::properties::prop::{Prop, PropType, PropUnwrap},
    storage::{arc_str::ArcStr, timeindex::EventTime},
};
use rustc_hash::FxHashMap;
use std::{
    collections::{HashMap, HashSet},
    fmt::{Debug, Formatter},
    iter::Zip,
    sync::Arc,
};

use crate::db::api::{
    properties::internal::InternalPropertiesOps,
    view::{history::History, BoxedLIter},
};
use raphtory_api::core::{storage::timeindex::AsTime, utils::time::IntoTime};
#[cfg(feature = "arrow")]
use {arrow::array::ArrayRef, raphtory_api::core::entities::properties::prop::PropArrayUnwrap};

#[derive(Clone)]
pub struct TemporalPropertyView<P: InternalPropertiesOps> {
    pub(crate) id: usize,
    pub(crate) props: P,
}

impl<P: InternalPropertiesOps + Clone> Debug for TemporalPropertyView<P> {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("TemporalPropertyView")
            .field("history", &self.iter().collect::<Vec<_>>())
            .finish()
    }
}

impl<P: InternalPropertiesOps + Clone> PartialEq for TemporalPropertyView<P> {
    fn eq(&self, other: &Self) -> bool {
        self.iter().eq(other.iter())
    }
}

impl<P: InternalPropertiesOps + Clone, RHS, V> PartialEq<RHS> for TemporalPropertyView<P>
where
    for<'a> &'a RHS: IntoIterator<Item = &'a (EventTime, V)>,
    V: Clone + Into<Prop>,
{
    fn eq(&self, other: &RHS) -> bool {
        self.iter()
            .eq(other.into_iter().map(|(t, v)| (*t, v.clone().into())))
    }
}

impl<P: InternalPropertiesOps + Clone> TemporalPropertyView<P> {
    pub(crate) fn new(props: P, key: usize) -> Self {
        TemporalPropertyView { props, id: key }
    }

    pub fn is_empty(&self) -> bool {
        self.iter().next().is_none()
    }

    pub fn dtype(&self) -> PropType {
        self.props.dtype(self.id)
    }

    pub fn id(&self) -> usize {
        self.id
    }

    pub fn history(&self) -> History<'_, Self> {
        History::new(self.clone())
    }

    pub fn values(&self) -> BoxedLIter<'_, Prop> {
        self.props.temporal_values_iter(self.id)
    }

    pub fn values_rev(&self) -> BoxedLIter<'_, Prop> {
        self.props.temporal_values_iter_rev(self.id)
    }

    pub fn iter(&self) -> impl Iterator<Item = (EventTime, Prop)> + '_ {
        self.history().into_iter().zip(self.values())
    }

    pub fn first(&self) -> Option<Prop> {
        self.props.temporal_values_iter(self.id).next()
    }

    pub fn iter_rev(&self) -> impl Iterator<Item = (EventTime, Prop)> + '_ {
        self.history().reverse().into_iter().zip(self.values_rev())
    }

    pub fn iter_indexed(&self) -> impl Iterator<Item = (EventTime, Prop)> + use<'_, P> {
        self.props.temporal_iter(self.id)
    }

    pub fn iter_indexed_rev(&self) -> impl Iterator<Item = (EventTime, Prop)> + use<'_, P> {
        self.props.temporal_iter_rev(self.id)
    }

    pub fn at<T: IntoTime>(&self, t: T) -> Option<Prop> {
        let t = EventTime::end(t.into_time().t());
        self.props.temporal_value_at(self.id, t)
    }
    pub fn latest(&self) -> Option<Prop> {
        self.props.temporal_value(self.id)
    }

    pub fn unique(&self) -> Vec<Prop> {
        let unique_props: HashSet<_> = self.values().collect();
        unique_props.into_iter().collect()
    }

    pub fn ordered_dedupe(&self, latest_time: bool) -> Vec<(EventTime, Prop)> {
        let mut last_seen_value: Option<Prop> = None;
        let mut result: Vec<(EventTime, Prop)> = vec![];

        let mut current_entry: Option<(EventTime, Prop)> = None;

        for (t, prop) in self {
            if latest_time {
                if last_seen_value == Some(prop.clone()) {
                    current_entry = Some((t, prop.clone()));
                } else {
                    if let Some(entry) = current_entry.take() {
                        result.push(entry);
                    }
                    current_entry = Some((t, prop.clone()));
                }
                last_seen_value = Some(prop.clone());
            } else if last_seen_value != Some(prop.clone()) {
                result.push((t, prop.clone()));
                last_seen_value = Some(prop.clone());
            }
        }

        if let Some(entry) = current_entry {
            result.push(entry);
        }

        result
    }
}

impl<P: InternalPropertiesOps + Clone> IntoIterator for TemporalPropertyView<P> {
    type Item = (EventTime, Prop);
    type IntoIter = Zip<std::vec::IntoIter<EventTime>, std::vec::IntoIter<Prop>>;

    fn into_iter(self) -> Self::IntoIter {
        let hist = self.history().iter().collect::<Vec<_>>();
        let vals = self.values().collect::<Vec<_>>();
        hist.into_iter().zip(vals)
    }
}

impl<P: InternalPropertiesOps + Clone> IntoIterator for &TemporalPropertyView<P> {
    type Item = (EventTime, Prop);
    type IntoIter = Zip<std::vec::IntoIter<EventTime>, std::vec::IntoIter<Prop>>;

    fn into_iter(self) -> Self::IntoIter {
        let hist = self.history().iter().collect::<Vec<_>>();
        let vals = self.values().collect::<Vec<_>>();
        hist.into_iter().zip(vals)
    }
}
#[derive(Clone)]
pub struct TemporalProperties<P: InternalPropertiesOps + Clone> {
    pub(crate) props: P,
}

impl<P: InternalPropertiesOps + Clone> IntoIterator for TemporalProperties<P> {
    type Item = (ArcStr, TemporalPropertyView<P>);
    type IntoIter = Zip<std::vec::IntoIter<ArcStr>, std::vec::IntoIter<TemporalPropertyView<P>>>;

    fn into_iter(self) -> Self::IntoIter {
        let keys: Vec<_> = self.keys().collect();
        let values: Vec<_> = self.values().collect();
        keys.into_iter().zip(values)
    }
}

impl<P: InternalPropertiesOps + Clone> TemporalProperties<P> {
    pub(crate) fn new(props: P) -> Self {
        Self { props }
    }

    pub fn keys(&self) -> impl Iterator<Item = ArcStr> + '_ {
        self.props.temporal_prop_keys()
    }

    pub fn contains(&self, key: &str) -> bool {
        self.props.get_temporal_prop_id(key).is_some()
    }

    pub fn values(&self) -> impl Iterator<Item = TemporalPropertyView<P>> + '_ {
        self.props
            .temporal_prop_ids()
            .map(|k| TemporalPropertyView::new(self.props.clone(), k))
    }

    pub fn iter_latest(&self) -> impl Iterator<Item = (ArcStr, Prop)> + '_ {
        self.iter().flat_map(|(k, v)| v.latest().map(|v| (k, v)))
    }

    pub fn iter(&self) -> impl Iterator<Item = (ArcStr, TemporalPropertyView<P>)> + '_ {
        self.keys().zip(self.values())
    }

    pub fn iter_filtered(&self) -> impl Iterator<Item = (ArcStr, TemporalPropertyView<P>)> + '_ {
        self.iter().filter(|(_, v)| !v.is_empty())
    }

    pub fn get(&self, key: &str) -> Option<TemporalPropertyView<P>> {
        self.props
            .get_temporal_prop_id(key)
            .map(|k| TemporalPropertyView::new(self.props.clone(), k))
    }

    pub fn get_by_id(&self, id: usize) -> Option<TemporalPropertyView<P>> {
        Some(TemporalPropertyView::new(self.props.clone(), id))
    }

    pub fn histories(&self) -> Vec<(ArcStr, (EventTime, Prop))> {
        self.iter()
            .flat_map(|(k, v)| v.into_iter().map(move |v| (k.clone(), v.clone())))
            .collect()
    }

    pub fn collect_properties(self) -> Vec<(ArcStr, Prop)> {
        self.iter()
            .flat_map(|(k, v)| v.latest().map(|v| (k.clone(), v)))
            .collect()
    }

    pub fn as_map(&self) -> HashMap<ArcStr, Vec<(EventTime, Prop)>> {
        self.iter()
            .map(|(key, value)| (key, value.iter().collect()))
            .collect()
    }
}

impl<P: InternalPropertiesOps + Clone> PropUnwrap for TemporalPropertyView<P> {
    fn into_u8(self) -> Option<u8> {
        self.latest().into_u8()
    }

    fn into_u16(self) -> Option<u16> {
        self.latest().into_u16()
    }

    fn into_str(self) -> Option<ArcStr> {
        self.latest().into_str()
    }

    fn into_i32(self) -> Option<i32> {
        self.latest().into_i32()
    }

    fn into_i64(self) -> Option<i64> {
        self.latest().into_i64()
    }

    fn into_u32(self) -> Option<u32> {
        self.latest().into_u32()
    }

    fn into_u64(self) -> Option<u64> {
        self.latest().into_u64()
    }

    fn into_f32(self) -> Option<f32> {
        self.latest().into_f32()
    }

    fn into_f64(self) -> Option<f64> {
        self.latest().into_f64()
    }

    fn into_bool(self) -> Option<bool> {
        self.latest().into_bool()
    }

    fn into_list(self) -> Option<Arc<Vec<Prop>>> {
        self.latest().into_list()
    }

    fn into_map(self) -> Option<Arc<FxHashMap<ArcStr, Prop>>> {
        self.latest().into_map()
    }

    fn into_ndtime(self) -> Option<NaiveDateTime> {
        self.latest().into_ndtime()
    }

    fn as_f64(&self) -> Option<f64> {
        self.latest().as_f64()
    }

    fn into_decimal(self) -> Option<BigDecimal> {
        self.latest().into_decimal()
    }
}

#[cfg(feature = "arrow")]
impl<P: InternalPropertiesOps + Clone> PropArrayUnwrap for TemporalPropertyView<P> {
    fn into_array(self) -> Option<ArrayRef> {
        self.latest().into_array()
    }
}