use crate::{
core::{
storage::timeindex::{AsTime, EventTime, TimeIndex, TimeIndexOps},
utils::iter::GenLockedDIter,
},
db::{
api::{
properties::internal::InheritPropertiesOps, storage::storage::Storage,
view::internal::*,
},
graph::graph::graph_equal,
},
prelude::*,
};
use raphtory_api::{
core::entities::properties::tprop::TPropOps,
inherit::Base,
iter::{BoxedLDIter, IntoDynDBoxed},
GraphType,
};
use raphtory_storage::{graph::graph::GraphStorage, mutation::InheritMutationOps};
use serde::{Deserialize, Serialize};
use std::{
fmt::{Display, Formatter},
iter,
ops::{Deref, Range},
sync::Arc,
};
#[derive(Clone, Debug, Serialize, Deserialize, Default)]
pub struct PersistentGraph(pub(crate) Arc<Storage>);
impl Static for PersistentGraph {}
impl From<GraphStorage> for PersistentGraph {
fn from(value: GraphStorage) -> Self {
Self(Arc::new(Storage::from_inner(value)))
}
}
impl From<Arc<Storage>> for PersistentGraph {
fn from(value: Arc<Storage>) -> Self {
Self(value)
}
}
impl Display for PersistentGraph {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
Display::fmt(&self.0, f)
}
}
fn last_prop_value_before<'a>(
t: EventTime,
props: impl TPropOps<'a>,
deletions: impl TimeIndexOps<'a, IndexType = EventTime>,
) -> Option<(EventTime, Prop)> {
props
.last_before(t) .filter(|(last_t, _)| !deletions.active(*last_t..t))
}
fn persisted_prop_value_at<'a>(
t: i64,
props: impl TPropOps<'a>,
deletions: impl TimeIndexOps<'a, IndexType = EventTime>,
) -> Option<Prop> {
if props.active_t(t..t.saturating_add(1)) || deletions.active_t(t..t.saturating_add(1)) {
None
} else {
last_prop_value_before(EventTime::start(t), props, deletions).map(|(_, v)| v)
}
}
impl PersistentGraph {
pub fn new() -> Self {
Self::default()
}
pub fn from_storage(storage: Arc<Storage>) -> Self {
Self(storage)
}
pub fn from_internal_graph(internal_graph: GraphStorage) -> Self {
Self(Arc::new(Storage::from_inner(internal_graph)))
}
pub fn event_graph(&self) -> Graph {
Graph::from_storage(self.0.clone())
}
pub fn persistent_graph(&self) -> PersistentGraph {
self.clone()
}
}
impl<'graph, G: GraphViewOps<'graph>> PartialEq<G> for PersistentGraph {
fn eq(&self, other: &G) -> bool {
graph_equal(self, other)
}
}
impl Base for PersistentGraph {
type Base = Storage;
#[inline(always)]
fn base(&self) -> &Self::Base {
&self.0
}
}
impl InheritStorageOps for PersistentGraph {}
impl InternalMaterialize for PersistentGraph {
fn graph_type(&self) -> GraphType {
GraphType::PersistentGraph
}
}
impl InheritMutationOps for PersistentGraph {}
impl InheritListOps for PersistentGraph {}
impl InheritCoreGraphOps for PersistentGraph {}
impl InheritPropertiesOps for PersistentGraph {}
impl InheritLayerOps for PersistentGraph {}
impl InheritAllEdgeFilterOps for PersistentGraph {}
impl InheritNodeFilterOps for PersistentGraph {}
impl GraphTimeSemanticsOps for PersistentGraph {
fn node_time_semantics(&self) -> TimeSemantics {
TimeSemantics::persistent()
}
fn edge_time_semantics(&self) -> TimeSemantics {
TimeSemantics::persistent()
}
fn view_start(&self) -> Option<EventTime> {
self.0.view_start()
}
fn view_end(&self) -> Option<EventTime> {
self.0.view_end()
}
fn earliest_time_global(&self) -> Option<i64> {
self.0.earliest_time_global()
}
fn latest_time_global(&self) -> Option<i64> {
self.0.latest_time_global()
}
fn earliest_time_window(&self, start: EventTime, end: EventTime) -> Option<i64> {
self.earliest_time_global()
.map(|t| t.max(start.t()))
.filter(|&t| t < end.t())
}
fn latest_time_window(&self, start: EventTime, end: EventTime) -> Option<i64> {
if self.0.earliest_time_global()? >= end.t() {
return None;
}
self.latest_time_global()
.map(|t| t.min(end.t().saturating_sub(1)).max(start.t()))
}
#[inline]
fn has_temporal_prop(&self, prop_id: usize) -> bool {
self.0.has_temporal_prop(prop_id)
}
fn temporal_prop_iter(&self, prop_id: usize) -> BoxedLDIter<'_, (EventTime, Prop)> {
self.0.temporal_prop_iter(prop_id)
}
#[inline]
fn has_temporal_prop_window(&self, prop_id: usize, w: Range<EventTime>) -> bool {
self.temporal_prop_iter_window(prop_id, w.start, w.end)
.next()
.is_some()
}
fn temporal_prop_iter_window(
&self,
prop_id: usize,
start: EventTime,
end: EventTime,
) -> BoxedLDIter<'_, (EventTime, Prop)> {
if let Some(prop) = self.graph_meta().get_temporal_prop(prop_id) {
let first =
persisted_prop_value_at(start.t(), &*prop, &TimeIndex::Empty).map(|v| (start, v));
first
.into_iter()
.chain(GenLockedDIter::from(prop, |prop| {
prop.deref().iter_window(start..end).into_dyn_dboxed()
}))
.into_dyn_dboxed()
} else {
iter::empty().into_dyn_dboxed()
}
}
fn temporal_prop_last_at(&self, prop_id: usize, t: EventTime) -> Option<(EventTime, Prop)> {
self.0.temporal_prop_last_at(prop_id, t)
}
fn temporal_prop_last_at_window(
&self,
prop_id: usize,
t: EventTime,
w: Range<EventTime>,
) -> Option<(EventTime, Prop)> {
if w.contains(&t) {
self.0
.temporal_prop_last_at(prop_id, t)
.map(|(t, v)| (t.max(w.start), v))
} else {
None
}
}
}
#[cfg(test)]
mod test {
use crate::db::api::view::time::internal::InternalTimeOps;
use super::*;
#[test]
fn test_view_start_end() {
let g = PersistentGraph::new();
let e = g.add_edge(0, 1, 2, NO_PROPS, None).unwrap();
assert_eq!(g.start(), None);
assert_eq!(g.timeline_start().map(|t| t.t()), Some(0));
assert_eq!(g.end(), None);
assert_eq!(g.timeline_end().map(|t| t.t()), Some(1));
e.delete(2, None).unwrap();
assert_eq!(g.timeline_start().map(|t| t.t()), Some(0));
assert_eq!(g.timeline_end().map(|t| t.t()), Some(3));
let w = g.window(g.timeline_start().unwrap(), g.timeline_end().unwrap());
assert!(g.has_edge(1, 2));
assert!(w.has_edge(1, 2));
assert_eq!(w.start().map(|t| t.t()), Some(0));
assert_eq!(w.timeline_start().map(|t| t.t()), Some(0));
assert_eq!(w.end().map(|t| t.t()), Some(3));
assert_eq!(w.timeline_end().map(|t| t.t()), Some(3));
e.add_updates(4, NO_PROPS, None).unwrap();
assert_eq!(g.timeline_start().map(|t| t.t()), Some(0));
assert_eq!(g.timeline_end().map(|t| t.t()), Some(5));
}
#[test]
fn test_materialize_window_earliest_time() {
let g = PersistentGraph::new();
g.add_edge(0, 1, 2, NO_PROPS, None).unwrap();
g.delete_edge(10, 1, 2, None).unwrap();
let ltg = g.latest_time_global();
assert_eq!(ltg, Some(10));
let wg = g.window(3, 5);
let e = wg.edge(1, 2).unwrap();
assert_eq!(e.earliest_time().map(|t| t.t()), Some(3));
assert_eq!(e.latest_time().map(|t| t.t()), Some(3));
let n1 = wg.node(1).unwrap();
assert_eq!(n1.earliest_time().unwrap().t(), 3);
assert_eq!(n1.latest_time().unwrap().t(), 3);
let n2 = wg.node(2).unwrap();
assert_eq!(n2.earliest_time().unwrap().t(), 3);
assert_eq!(n2.latest_time().unwrap().t(), 3);
let actual_lt = wg.latest_time();
assert_eq!(actual_lt.unwrap().t(), 3);
let actual_et = wg.earliest_time();
assert_eq!(actual_et.unwrap().t(), 3);
let gm = g
.window(3, 5)
.materialize()
.unwrap()
.into_persistent()
.unwrap();
let expected_et = gm.earliest_time();
assert_eq!(actual_et, expected_et);
}
}