use crate::{
db::{
api::{
state::{
ops::{self, node::NodeOp, DynNodeFilter, HistoryOp, IntoArrowNodeOp},
DateTimesStruct, EventIdsStruct, IntervalsStruct, LazyNodeState, NodeState,
TimeStampsStruct,
},
view::{
history::{History, InternalHistoryOps},
DynamicGraph,
},
},
graph::{node::NodeView, nodes::Nodes},
},
impl_lazy_node_state, impl_node_state, impl_node_state_ops,
prelude::{GraphViewOps, NodeStateOps, NodeViewOps},
python::{
graph::history::{
PyHistory, PyHistoryDateTime, PyHistoryEventId, PyHistoryTimestamp, PyIntervals,
},
types::{repr::Repr, wrappers::iterators::PyBorrowingIterator},
utils::PyNodeRef,
},
};
use pyo3::{
exceptions::{PyKeyError, PyTypeError},
prelude::*,
types::{PyDict, PyNotImplemented},
IntoPyObjectExt,
};
use raphtory_api::{core::storage::timeindex::EventTime, python::timeindex::PyOptionalEventTime};
use raphtory_core::entities::nodes::node_ref::{AsNodeRef, NodeRef};
use std::{collections::HashMap, sync::Arc};
pub(crate) use crate::{
db::api::state::ops::IntoDynNodeOp, python::graph::node_state::node_state::ops::NodeFilterOp,
};
use crate::{
db::{api::state::OutputTypedNodeState, graph::nodes::IntoDynNodes},
python::graph::node_state::ops::{ArrowMap, Map},
};
#[pyclass(module = "raphtory.node_state", frozen)]
pub struct HistoryView {
inner: LazyNodeState<
'static,
HistoryOp<'static, DynamicGraph>,
DynamicGraph,
DynamicGraph,
DynNodeFilter,
>,
}
impl HistoryView {
pub fn inner(
&self,
) -> &LazyNodeState<
'static,
HistoryOp<'static, DynamicGraph>,
DynamicGraph,
DynamicGraph,
DynNodeFilter,
> {
&self.inner
}
pub fn iter(
&self,
) -> impl Iterator<Item = History<'static, NodeView<'static, DynamicGraph>>> + '_ {
self.inner.iter_values()
}
}
#[pymethods]
impl HistoryView {
#[getter]
fn t(
&self,
) -> LazyNodeState<
'static,
ArrowMap<Map<HistoryOp<'static, DynamicGraph>, PyHistoryTimestamp>, TimeStampsStruct>,
DynamicGraph,
DynamicGraph,
DynNodeFilter,
> {
let op = self.inner.op.clone().map(|hist| hist.t().into());
LazyNodeState::new(op.into_arrow_node_op(), self.inner.nodes())
}
#[getter]
fn dt(
&self,
) -> LazyNodeState<
'static,
ArrowMap<Map<HistoryOp<'static, DynamicGraph>, PyHistoryDateTime>, DateTimesStruct>,
DynamicGraph,
DynamicGraph,
DynNodeFilter,
> {
let op = self.inner.op.clone().map(|hist| hist.dt().into());
LazyNodeState::new(op.into_arrow_node_op(), self.inner.nodes())
}
#[getter]
fn event_id(
&self,
) -> LazyNodeState<
'static,
ArrowMap<Map<HistoryOp<'static, DynamicGraph>, PyHistoryEventId>, EventIdsStruct>,
DynamicGraph,
DynamicGraph,
DynNodeFilter,
> {
let op = self.inner.op.clone().map(|hist| hist.event_id().into());
LazyNodeState::new(op.into_arrow_node_op(), self.inner.nodes())
}
#[getter]
fn intervals(
&self,
) -> LazyNodeState<
'static,
ArrowMap<Map<HistoryOp<'static, DynamicGraph>, PyIntervals>, IntervalsStruct>,
DynamicGraph,
DynamicGraph,
DynNodeFilter,
> {
let op = self.inner.op.clone().map(|hist| hist.intervals().into());
LazyNodeState::new(op.into_arrow_node_op(), self.inner.nodes())
}
fn earliest_time(
&self,
) -> LazyNodeState<
'static,
ops::EarliestTime<DynamicGraph>,
DynamicGraph,
DynamicGraph,
DynNodeFilter,
> {
self.inner.earliest_time()
}
fn latest_time(
&self,
) -> LazyNodeState<
'static,
ops::LatestTime<DynamicGraph>,
DynamicGraph,
DynamicGraph,
DynNodeFilter,
> {
self.inner.latest_time()
}
fn compute(
&self,
) -> NodeState<'static, History<'static, NodeView<'static, DynamicGraph>>, DynamicGraph> {
self.inner.compute()
}
fn collect(&self) -> Vec<History<'static, NodeView<'static, DynamicGraph>>> {
self.inner.collect()
}
fn collect_time_entries(&self) -> Vec<EventTime> {
self.inner.collect_time_entries()
}
fn flatten(&self) -> PyHistory {
self.inner.flatten().into_arc_dyn().into()
}
fn __len__(&self) -> usize {
NodeStateOps::len(&self.inner)
}
fn nodes(&self) -> Nodes<'static, DynamicGraph, DynamicGraph, DynNodeFilter> {
self.inner.nodes().into_dyn()
}
fn __eq__<'py>(
&self,
other: &Bound<'py, PyAny>,
py: Python<'py>,
) -> Result<Bound<'py, PyAny>, std::convert::Infallible> {
let res = if let Ok(other) = other.downcast::<Self>() {
let other = Bound::get(other);
self.inner == other.inner
} else if let Ok(other) =
other.extract::<Vec<History<'static, Arc<dyn InternalHistoryOps>>>>()
{
NodeStateOps::len(&self.inner) == other.len()
&& self
.inner
.iter_values()
.zip(other.into_iter())
.all(|(left, right)| left.iter().eq(right.iter()))
} else if let Ok(other) =
other.extract::<HashMap<PyNodeRef, History<'static, Arc<dyn InternalHistoryOps>>>>()
{
NodeStateOps::len(&self.inner) == other.len()
&& other.into_iter().all(|(node, value)| {
self.inner
.get_by_node(node)
.map(|v| v.iter().eq(value.iter()))
.unwrap_or(false)
})
} else if let Ok(other) = other.downcast::<PyDict>() {
NodeStateOps::len(&self.inner) == other.len()
&& other.items().iter().all(|item| {
if let Ok((node_ref, value)) = item.extract::<(PyNodeRef, Bound<'py, PyAny>)>()
{
self.inner
.get_by_node(node_ref)
.map(|l_value| {
if let Ok(l_value_py) = l_value.clone().into_bound_py_any(py) {
l_value_py.eq(value).unwrap_or(false)
} else {
false
}
})
.unwrap_or(false)
} else {
false
}
})
} else {
return Ok(PyNotImplemented::get(py).to_owned().into_any());
};
Ok(res.into_pyobject(py)?.to_owned().into_any())
}
fn __iter__(&self) -> PyBorrowingIterator {
py_borrowing_iter!(
self.inner.clone(),
LazyNodeState<
'static,
HistoryOp<'static, DynamicGraph>,
DynamicGraph,
DynamicGraph,
DynNodeFilter,
>,
|inner| inner.iter_values()
)
}
#[pyo3(signature = (node, default=None::<PyHistory>))]
fn get(&self, node: PyNodeRef, default: Option<PyHistory>) -> Option<PyHistory> {
self.inner.get_by_node(node).map(|v| v.into()).or(default)
}
fn __getitem__(
&self,
node: PyNodeRef,
) -> PyResult<History<'static, NodeView<'static, DynamicGraph>>> {
let node = node.as_node_ref();
self.inner.get_by_node(node).ok_or_else(|| match node {
NodeRef::External(id) => {
PyKeyError::new_err(format!("Missing value for node with id {id}"))
}
NodeRef::Internal(vid) => {
let node = self.inner.graph().node(vid);
match node {
Some(node) => PyKeyError::new_err(format!("Missing value {}", node.repr())),
None => PyTypeError::new_err("Invalid node reference"),
}
}
})
}
fn items(&self) -> PyBorrowingIterator {
py_borrowing_iter!(
self.inner.clone(),
LazyNodeState<
'static,
HistoryOp<'static, DynamicGraph>,
DynamicGraph,
DynamicGraph,
DynNodeFilter,
>,
|inner| NodeStateOps::iter(inner).map(|(n, v)| (n.cloned(), v))
)
}
fn values(&self) -> PyBorrowingIterator {
self.__iter__()
}
fn sorted_by_id(
&self,
) -> NodeState<'static, History<'static, NodeView<'static, DynamicGraph>>, DynamicGraph> {
self.inner.sort_by_id()
}
fn __repr__(&self) -> String {
self.inner.repr()
}
fn to_df<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let pandas = PyModule::import(py, "pandas")?;
let columns = PyDict::new(py);
columns.set_item("node", self.inner.nodes().id())?;
columns.set_item("value", self.values())?;
pandas.call_method("DataFrame", (columns,), None)
}
}
impl
From<
LazyNodeState<
'static,
HistoryOp<'static, DynamicGraph>,
DynamicGraph,
DynamicGraph,
DynNodeFilter,
>,
> for HistoryView
{
fn from(
inner: LazyNodeState<
'static,
HistoryOp<'static, DynamicGraph>,
DynamicGraph,
DynamicGraph,
DynNodeFilter,
>,
) -> Self {
HistoryView { inner }
}
}
impl<'py> pyo3::IntoPyObject<'py>
for LazyNodeState<
'static,
HistoryOp<'static, DynamicGraph>,
DynamicGraph,
DynamicGraph,
DynNodeFilter,
>
{
type Target = HistoryView;
type Output = Bound<'py, Self::Target>;
type Error = <Self::Target as pyo3::IntoPyObject<'py>>::Error;
fn into_pyobject(self, py: Python<'py>) -> Result<Self::Output, Self::Error> {
HistoryView::from(self).into_pyobject(py)
}
}
impl<'py> FromPyObject<'py>
for LazyNodeState<
'static,
HistoryOp<'static, DynamicGraph>,
DynamicGraph,
DynamicGraph,
DynNodeFilter,
>
{
fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
Ok(ob.downcast::<HistoryView>()?.get().inner().clone())
}
}
#[allow(dead_code)]
type HistoryOpType<G> = HistoryOp<'static, G>;
#[pyclass(module = "raphtory.node_state", frozen)]
pub struct NodeStateHistory {
inner: NodeState<'static, History<'static, NodeView<'static, DynamicGraph>>, DynamicGraph>,
}
impl NodeStateHistory {
pub fn inner(
&self,
) -> &NodeState<'static, History<'static, NodeView<'static, DynamicGraph>>, DynamicGraph> {
&self.inner
}
pub fn iter(
&self,
) -> impl Iterator<Item = History<'static, NodeView<'static, DynamicGraph>>> + '_ {
self.inner.iter_values().map(|v| v.clone())
}
}
#[pymethods]
impl NodeStateHistory {
#[getter]
fn t(&self) -> NodeState<'static, PyHistoryTimestamp, DynamicGraph> {
let values = self
.inner
.iter_values()
.map(|h| h.clone().t().into())
.collect::<Vec<PyHistoryTimestamp>>()
.into();
NodeState::new(self.inner.graph().clone(), values, self.inner.ids().clone())
}
#[getter]
fn dt(&self) -> NodeState<'static, PyHistoryDateTime, DynamicGraph> {
let values = self
.inner
.iter_values()
.map(|h| h.clone().dt().into())
.collect::<Vec<PyHistoryDateTime>>()
.into();
NodeState::new(self.inner.graph().clone(), values, self.inner.ids().clone())
}
#[getter]
fn event_id(&self) -> NodeState<'static, PyHistoryEventId, DynamicGraph> {
let values = self
.inner
.iter_values()
.map(|h| h.clone().event_id().into())
.collect::<Vec<PyHistoryEventId>>()
.into();
NodeState::new(self.inner.graph().clone(), values, self.inner.ids().clone())
}
#[getter]
fn intervals(&self) -> NodeState<'static, PyIntervals, DynamicGraph> {
let values = self
.inner
.iter_values()
.map(|h| h.clone().intervals().into())
.collect::<Vec<PyIntervals>>()
.into();
NodeState::new(self.inner.graph().clone(), values, self.inner.ids().clone())
}
fn earliest_time(&self) -> PyOptionalEventTime {
self.inner.earliest_time().into()
}
fn latest_time(&self) -> PyOptionalEventTime {
self.inner.latest_time().into()
}
fn collect_time_entries(&self) -> Vec<EventTime> {
self.inner.collect_time_entries()
}
fn flatten(&self) -> PyHistory {
self.inner.flatten().into_arc_dyn().into()
}
fn __len__(&self) -> usize {
self.inner.len()
}
fn nodes(&self) -> Nodes<'static, DynamicGraph> {
self.inner.nodes()
}
fn __eq__<'py>(
&self,
other: &Bound<'py, PyAny>,
py: Python<'py>,
) -> Result<Bound<'py, PyAny>, std::convert::Infallible> {
let res = if let Ok(other) = other.downcast::<Self>() {
let other = Bound::get(other);
self.inner == other.inner
} else if let Ok(other) =
other.extract::<Vec<History<'static, Arc<dyn InternalHistoryOps>>>>()
{
self.inner.len() == other.len()
&& self
.inner
.iter_values()
.zip(other.into_iter())
.all(|(left, right)| left.iter().eq(right.iter()))
} else if let Ok(other) =
other.extract::<HashMap<PyNodeRef, History<'static, Arc<dyn InternalHistoryOps>>>>()
{
self.inner.len() == other.len()
&& other.into_iter().all(|(node, value)| {
self.inner
.get_by_node(node)
.map(|v| v.iter().eq(value.iter()))
.unwrap_or(false)
})
} else if let Ok(other) = other.downcast::<PyDict>() {
self.inner.len() == other.len()
&& other.items().iter().all(|item| {
if let Ok((node_ref, value)) = item.extract::<(PyNodeRef, Bound<'py, PyAny>)>()
{
self.inner
.get_by_node(node_ref)
.map(|l_value| {
if let Ok(l_value_py) = l_value.clone().into_bound_py_any(py) {
l_value_py.eq(value).unwrap_or(false)
} else {
false
}
})
.unwrap_or(false)
} else {
false
}
})
} else {
return Ok(PyNotImplemented::get(py).to_owned().into_any());
};
Ok(res.into_pyobject(py)?.to_owned().into_any())
}
fn __iter__(&self) -> PyBorrowingIterator {
py_borrowing_iter!(
self.inner.clone(),
NodeState<'static, History<'static, NodeView<'static, DynamicGraph>>, DynamicGraph>,
|inner| inner.iter_values().map(|v| v.clone())
)
}
#[pyo3(signature = (node, default=None::<PyHistory>))]
fn get(&self, node: PyNodeRef, default: Option<PyHistory>) -> Option<PyHistory> {
self.inner
.get_by_node(node)
.map(|v| v.clone().into())
.or(default)
}
fn __getitem__(
&self,
node: PyNodeRef,
) -> PyResult<History<'static, NodeView<'static, DynamicGraph>>> {
let node = node.as_node_ref();
self.inner
.get_by_node(node)
.map(|v| v.clone())
.ok_or_else(|| match node {
NodeRef::External(id) => {
PyKeyError::new_err(format!("Missing value for node with id {id}"))
}
NodeRef::Internal(vid) => {
let node = self.inner.graph().node(vid);
match node {
Some(node) => PyKeyError::new_err(format!("Missing value {}", node.repr())),
None => PyTypeError::new_err("Invalid node reference"),
}
}
})
}
fn items(&self) -> PyBorrowingIterator {
py_borrowing_iter!(
self.inner.clone(),
NodeState<'static, History<'static, NodeView<'static, DynamicGraph>>, DynamicGraph>,
|inner| inner.iter().map(|(n, v)| (n.cloned(), v.clone()))
)
}
fn values(&self) -> PyBorrowingIterator {
self.__iter__()
}
fn sorted_by_id(
&self,
) -> NodeState<'static, History<'static, NodeView<'static, DynamicGraph>>, DynamicGraph> {
self.inner.sort_by_id()
}
fn __repr__(&self) -> String {
self.inner.repr()
}
fn to_df<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let pandas = PyModule::import(py, "pandas")?;
let columns = PyDict::new(py);
columns.set_item("node", self.inner.nodes().id())?;
columns.set_item("value", self.values())?;
pandas.call_method("DataFrame", (columns,), None)
}
}
impl From<NodeState<'static, History<'static, NodeView<'static, DynamicGraph>>, DynamicGraph>>
for NodeStateHistory
{
fn from(
inner: NodeState<'static, History<'static, NodeView<'static, DynamicGraph>>, DynamicGraph>,
) -> Self {
NodeStateHistory { inner: inner }
}
}
impl<'py> pyo3::IntoPyObject<'py>
for NodeState<'static, History<'static, NodeView<'static, DynamicGraph>>, DynamicGraph>
{
type Target = NodeStateHistory;
type Output = Bound<'py, Self::Target>;
type Error = <Self::Target as pyo3::IntoPyObject<'py>>::Error;
fn into_pyobject(self, py: Python<'py>) -> Result<Self::Output, Self::Error> {
NodeStateHistory::from(self).into_pyobject(py)
}
}
impl<'py> FromPyObject<'py>
for NodeState<'static, History<'static, NodeView<'static, DynamicGraph>>, DynamicGraph>
{
fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
Ok(ob.downcast::<NodeStateHistory>()?.get().inner().clone())
}
}
impl From<PyHistoryTimestamp> for TimeStampsStruct {
fn from(history: PyHistoryTimestamp) -> Self {
TimeStampsStruct {
timestamps: history.history_t.collect(),
}
}
}
impl From<PyHistoryEventId> for EventIdsStruct {
fn from(event_ids: PyHistoryEventId) -> Self {
EventIdsStruct {
event_ids: event_ids.history_s.collect(),
}
}
}
impl From<PyHistoryDateTime> for DateTimesStruct {
fn from(datetimes: PyHistoryDateTime) -> Self {
DateTimesStruct {
datetimes: datetimes.history_dt.collect().ok(),
}
}
}
impl From<PyIntervals> for IntervalsStruct {
fn from(intervals: PyIntervals) -> Self {
IntervalsStruct {
intervals: intervals.intervals.collect(),
}
}
}
type HistoryI64<G> = ArrowMap<Map<HistoryOp<'static, G>, PyHistoryTimestamp>, TimeStampsStruct>;
impl_lazy_node_state!(
HistoryTimestampView<HistoryI64<DynamicGraph>>,
"NodeStateHistoryTimestamp",
"HistoryTimestamp"
);
impl_node_state!(
NodeStateHistoryTimestamp<PyHistoryTimestamp>,
"NodeStateHistoryTimestamp",
"HistoryTimestamp"
);
type HistoryU64<G> = ArrowMap<Map<HistoryOp<'static, G>, PyHistoryEventId>, EventIdsStruct>;
impl_lazy_node_state!(
HistoryEventIdView<HistoryU64<DynamicGraph>>,
"NodeStateHistoryEventId",
"HistoryEventId"
);
impl_node_state!(
NodeStateHistoryEventId<PyHistoryEventId>,
"NodeStateHistoryEventId",
"HistoryEventId"
);
type HistoryDT<G> = ArrowMap<Map<HistoryOp<'static, G>, PyHistoryDateTime>, DateTimesStruct>;
impl_lazy_node_state!(
HistoryDateTimeView<HistoryDT<DynamicGraph>>,
"NodeStateHistoryDateTime",
"HistoryDateTime"
);
impl_node_state!(
NodeStateHistoryDateTime<PyHistoryDateTime>,
"NodeStateHistoryDateTime",
"HistoryDateTime"
);