use crate::{
db::{
api::{
state::{
ops::{self, ArrowMap, DynNodeFilter, IntoArrowNodeOp, Map},
DateTimeStruct, EventIdStruct, LazyNodeState, NodeGroups, NodeOp, NodeState,
NodeStateOps, OutputTypedNodeState, TimeStampStruct,
},
view::DynamicGraph,
},
graph::{
node::NodeView,
nodes::{IntoDynNodes, Nodes},
},
},
impl_lazy_node_state, impl_lazy_node_state_ord, impl_node_state_group_by_ops,
impl_node_state_ops, impl_node_state_ord_ops,
prelude::*,
python::{
types::{repr::Repr, wrappers::iterators::PyBorrowingIterator},
utils::PyNodeRef,
},
};
use chrono::{DateTime, Utc};
use pyo3::{
exceptions::{PyKeyError, PyTypeError},
prelude::*,
types::{PyDict, PyNotImplemented},
IntoPyObjectExt,
};
use raphtory_api::core::storage::timeindex::{EventTime, TimeError};
use raphtory_core::entities::nodes::node_ref::{AsNodeRef, NodeRef};
use rayon::prelude::*;
use std::{cmp::Ordering, collections::HashMap};
use crate::{
db::api::{
state::{ops::IntoDynNodeOp, NodeStateGroupBy, OrderedNodeStateOps},
view::GraphViewOps,
},
python::graph::node_state::node_state::ops::NodeFilterOp,
};
type EarliestTimeOp = ops::history::EarliestTime<DynamicGraph>;
impl_lazy_node_state_ord!(
EarliestTimeView<EarliestTimeOp>,
"NodeStateOptionEventTime",
"Optional[EventTime]"
);
impl_node_state_group_by_ops!(EarliestTimeView, Option<EventTime>);
#[pymethods]
impl EarliestTimeView {
#[getter]
fn t(
&self,
) -> LazyNodeState<'static, EarliestTimestamp, DynamicGraph, DynamicGraph, DynNodeFilter> {
self.inner.t()
}
#[getter]
fn dt(
&self,
) -> LazyNodeState<'static, EarliestDateTime, DynamicGraph, DynamicGraph, DynNodeFilter> {
self.inner.dt()
}
#[getter]
fn event_id(
&self,
) -> LazyNodeState<'static, EarliestEventId, DynamicGraph, DynamicGraph, DynNodeFilter> {
self.inner.event_id()
}
}
type EarliestTimestamp = ArrowMap<Map<EarliestTimeOp, Option<i64>>, TimeStampStruct>;
type EarliestEventId = ArrowMap<Map<EarliestTimeOp, Option<usize>>, EventIdStruct>;
type EarliestDateTime =
ArrowMap<Map<EarliestTimeOp, Result<Option<DateTime<Utc>>, TimeError>>, DateTimeStruct>;
type EarliestDateTimeOutput = <EarliestDateTime as NodeOp>::Output;
impl_lazy_node_state_ord!(
EarliestTimestampView<EarliestTimestamp>,
"NodeStateOptionI64",
"Optional[int]"
);
impl_node_state_group_by_ops!(EarliestTimestampView, Option<i64>);
impl_lazy_node_state_ord!(
EarliestEventIdView<EarliestEventId>,
"NodeStateOptionUsize",
"Optional[int]"
); impl_node_state_group_by_ops!(EarliestEventIdView, Option<usize>);
#[pyclass(module = "raphtory.node_state", frozen)]
pub struct EarliestDateTimeView {
inner: LazyNodeState<'static, EarliestDateTime, DynamicGraph, DynamicGraph, DynNodeFilter>,
}
impl EarliestDateTimeView {
pub fn inner(
&self,
) -> &LazyNodeState<'static, EarliestDateTime, DynamicGraph, DynamicGraph, DynNodeFilter> {
&self.inner
}
pub fn iter(&self) -> impl Iterator<Item = EarliestDateTimeOutput> + '_ {
self.inner.iter_values()
}
}
#[pymethods]
impl EarliestDateTimeView {
fn compute(
&self,
) -> Result<NodeState<'static, Option<DateTime<Utc>>, DynamicGraph>, TimeError> {
self.inner.compute_result_type()
}
fn compute_valid(&self) -> NodeState<'static, Option<DateTime<Utc>>, DynamicGraph> {
self.inner.compute_valid_results()
}
fn collect(&self) -> PyResult<Vec<Option<DateTime<Utc>>>> {
self.inner
.iter_values()
.map(|v| v.map_err(PyErr::from))
.collect::<PyResult<Vec<_>>>()
}
fn collect_valid(&self) -> Vec<DateTime<Utc>> {
self.inner
.iter_values()
.filter_map(|r| r.ok().flatten())
.collect::<Vec<_>>()
}
fn __len__(&self) -> usize {
self.inner.len()
}
fn nodes(&self) -> Nodes<'static, DynamicGraph, DynamicGraph, DynNodeFilter> {
self.inner.nodes().into_dyn()
}
fn __eq__<'py>(
&self,
other: &Bound<'py, PyAny>,
py: Python<'py>,
) -> Result<Bound<'py, PyAny>, std::convert::Infallible> {
let res = if let Ok(other) = other.downcast::<Self>() {
let other = Bound::get(other);
self.inner == other.inner
} else if let Ok(other) = other.extract::<Vec<Option<DateTime<Utc>>>>() {
self.inner
.iter_values()
.eq(other.into_iter().map(|o| Ok(o)))
} else if let Ok(other) = other.extract::<HashMap<PyNodeRef, Option<DateTime<Utc>>>>() {
self.inner.len() == other.len()
&& other
.into_iter()
.all(|(node, value)| self.inner.get_by_node(node) == Some(Ok(value)))
} else if let Ok(other) = other.downcast::<PyDict>() {
self.inner.len() == other.len()
&& other.items().iter().all(|item| {
if let Ok((node_ref, value)) = item.extract::<(PyNodeRef, Bound<'py, PyAny>)>()
{
self.inner
.get_by_node(node_ref)
.map(|l_value| {
match l_value {
Ok(inner_value) => {
if let Ok(l_value_py) = inner_value.into_bound_py_any(py) {
l_value_py.eq(value).unwrap_or(false)
} else {
false
}
}
Err(_) => false, }
})
.unwrap_or(false)
} else {
false
}
})
} else {
return Ok(PyNotImplemented::get(py).to_owned().into_any());
};
Ok(res.into_pyobject(py)?.to_owned().into_any())
}
fn __iter__(&self) -> PyBorrowingIterator {
py_borrowing_iter_result!(
self.inner.clone(),
LazyNodeState<'static, EarliestDateTime, DynamicGraph, DynamicGraph, DynNodeFilter>,
|inner| inner.iter_values()
)
}
fn iter_valid(&self) -> PyBorrowingIterator {
py_borrowing_iter!(
self.inner.clone(),
LazyNodeState<'static, EarliestDateTime, DynamicGraph, DynamicGraph, DynNodeFilter>,
|inner| inner.iter_values().filter_map(|r| r.ok().flatten())
)
}
#[doc = " default (Optional[datetime]): the default value. Defaults to None."]
#[doc = " Optional[datetime]: the value for the node or the default value"]
#[pyo3(signature = (node, default=None::<DateTime<Utc>>))]
fn get(
&self,
node: PyNodeRef,
default: Option<DateTime<Utc>>,
) -> PyResult<Option<DateTime<Utc>>> {
match self.inner.get_by_node(node) {
Some(v) => v.map_err(PyErr::from),
None => Ok(default),
}
}
fn __getitem__(&self, node: PyNodeRef) -> PyResult<Option<DateTime<Utc>>> {
let node = node.as_node_ref();
match self.inner.get_by_node(node) {
Some(v) => v.map_err(PyErr::from),
None => match node {
NodeRef::External(id) => Err(PyKeyError::new_err(format!(
"Missing value for node with id {id}"
))),
NodeRef::Internal(vid) => {
let node = self.inner.graph().node(vid);
match node {
Some(node) => Err(PyKeyError::new_err(format!(
"Missing value {}",
node.repr()
))),
None => Err(PyTypeError::new_err("Invalid node reference")),
}
}
},
}
}
fn items(&self) -> PyBorrowingIterator {
py_borrowing_iter_tuple_result!(
self.inner.clone(),
LazyNodeState<'static, EarliestDateTime, DynamicGraph, DynamicGraph, DynNodeFilter>,
|inner| inner.iter().map(|(n, v)| (n.cloned(), v))
)
}
fn items_valid(&self) -> PyBorrowingIterator {
py_borrowing_iter!(
self.inner.clone(),
LazyNodeState<'static, EarliestDateTime, DynamicGraph, DynamicGraph, DynNodeFilter>,
|inner| inner
.iter()
.filter(|(_, v)| v.as_ref().is_ok_and(|opt| opt.is_some()))
.map(|(n, v)| (n.cloned(), v.unwrap().unwrap()))
)
}
fn values(&self) -> PyBorrowingIterator {
self.__iter__()
}
fn values_valid(&self) -> PyBorrowingIterator {
self.iter_valid()
}
fn sorted_by_id(
&self,
) -> Result<NodeState<'static, Option<DateTime<Utc>>, DynamicGraph>, TimeError> {
self.compute().map(|ns| ns.sort_by_id())
}
fn sorted_by_id_valid(&self) -> NodeState<'static, Option<DateTime<Utc>>, DynamicGraph> {
self.compute_valid().sort_by_id()
}
fn __repr__(&self) -> String {
self.inner.repr()
}
fn to_df<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let pandas = PyModule::import(py, "pandas")?;
let columns = PyDict::new(py);
columns.set_item("node", self.inner.nodes().id())?;
columns.set_item("value", self.values())?;
pandas.call_method("DataFrame", (columns,), None)
}
}
#[pymethods]
impl EarliestDateTimeView {
#[pyo3(signature = (reverse = false))]
fn sorted(
&self,
reverse: bool,
) -> PyResult<NodeState<'static, Option<DateTime<Utc>>, DynamicGraph>> {
if let Some(err) = self.inner.iter_values().find_map(|r| r.err()) {
return Err(PyErr::from(err));
}
let op: ArrowMap<
Map<
ArrowMap<
Map<ops::EarliestTime<DynamicGraph>, Result<Option<DateTime<Utc>>, TimeError>>,
DateTimeStruct,
>,
Option<DateTime<Utc>>,
>,
DateTimeStruct,
> = self
.inner
.op
.clone()
.map(|r| r.unwrap())
.into_arrow_node_op();
let lazy_node_state = LazyNodeState::new(op, self.inner.nodes());
Ok(if reverse {
lazy_node_state.sort_by_values_by(|a, b| a.cmp(b).reverse())
} else {
lazy_node_state.sort_by_values_by(|a, b| match (a, b) {
(Some(a), Some(b)) => a.cmp(b),
(None, Some(_)) => Ordering::Greater,
(Some(_), None) => Ordering::Less,
(None, None) => Ordering::Equal,
})
})
}
fn top_k(
&self,
k: usize,
) -> Result<NodeState<'static, Option<DateTime<Utc>>, DynamicGraph>, TimeError> {
self.compute().map(|ns| {
ns.top_k_by(
|a, b| a.cmp(b), k,
)
})
}
fn bottom_k(
&self,
k: usize,
) -> Result<NodeState<'static, Option<DateTime<Utc>>, DynamicGraph>, TimeError> {
self.compute().map(|ns| {
ns.bottom_k_by(
|a, b| match (a, b) {
(Some(a), Some(b)) => a.cmp(b),
(None, Some(_)) => Ordering::Greater,
(Some(_), None) => Ordering::Less,
(None, None) => Ordering::Equal,
},
k,
)
})
}
fn min_item(&self) -> PyResult<Option<(NodeView<'static, DynamicGraph>, DateTime<Utc>)>> {
let min = self.inner.min_item_by(|a, b| match (a, b) {
(Ok(a), Ok(b)) => a.cmp(b),
(Err(_), Ok(_)) => Ordering::Greater,
(Ok(_), Err(_)) => Ordering::Less,
_ => Ordering::Equal,
});
match min {
Some((n, Ok(Some(o)))) => Ok(Some((n.cloned(), o.clone()))),
Some((_, Ok(None))) => Ok(None),
Some((_, Err(e))) => Err(PyErr::from(e.clone())),
None => Ok(None),
}
}
fn min(&self) -> PyResult<Option<DateTime<Utc>>> {
self.min_item().map(|v| v.map(|(_, date)| date))
}
fn max_item(&self) -> PyResult<Option<(NodeView<'static, DynamicGraph>, DateTime<Utc>)>> {
let max = self.inner.max_item_by(|a, b| match (a, b) {
(Ok(a), Ok(b)) => a.cmp(b),
(Err(_), Ok(_)) => Ordering::Less,
(Ok(_), Err(_)) => Ordering::Greater,
_ => Ordering::Equal,
});
match max {
Some((n, Ok(Some(o)))) => Ok(Some((n.cloned(), o.clone()))),
Some((_, Ok(None))) => Ok(None),
Some((_, Err(e))) => Err(PyErr::from(e.clone())),
None => Ok(None),
}
}
fn max(&self) -> PyResult<Option<DateTime<Utc>>> {
self.max_item().map(|v| v.map(|(_, date)| date))
}
fn median(&self) -> Option<DateTime<Utc>> {
self.median_item().map(|(_, v)| v)
}
fn median_item(&self) -> Option<(NodeView<'static, DynamicGraph>, DateTime<Utc>)> {
let mut values: Vec<_> = self
.inner
.par_iter()
.filter_map(|(n, result)| match result {
Ok(Some(o)) => Some((n.cloned(), o.clone())),
_ => None,
})
.collect();
let len = values.len();
if len == 0 {
return None;
}
values.par_sort_by(|(_, v1), (_, v2)| v1.cmp(v2));
let median_index = len / 2;
values.into_iter().nth(median_index).map(|(n, o)| (n, o)) }
fn groups(&self) -> PyResult<NodeGroups<Option<DateTime<Utc>>, DynamicGraph>> {
if let Some(err) = self.inner.iter_values().find_map(|r| r.err()) {
return Err(PyErr::from(err));
}
Ok(self.inner.group_by(|result| match result {
Ok(Some(dt)) => Some(dt),
_ => None, }))
}
}
impl From<LazyNodeState<'static, EarliestDateTime, DynamicGraph, DynamicGraph, DynNodeFilter>>
for EarliestDateTimeView
{
fn from(
inner: LazyNodeState<'static, EarliestDateTime, DynamicGraph, DynamicGraph, DynNodeFilter>,
) -> Self {
EarliestDateTimeView { inner }
}
}
impl<'py> pyo3::IntoPyObject<'py>
for LazyNodeState<'static, EarliestDateTime, DynamicGraph, DynamicGraph, DynNodeFilter>
{
type Target = EarliestDateTimeView;
type Output = Bound<'py, Self::Target>;
type Error = <Self::Target as pyo3::IntoPyObject<'py>>::Error;
fn into_pyobject(self, py: Python<'py>) -> Result<Self::Output, Self::Error> {
EarliestDateTimeView::from(self).into_pyobject(py)
}
}
impl<'py> FromPyObject<'py>
for LazyNodeState<'static, EarliestDateTime, DynamicGraph, DynamicGraph, DynNodeFilter>
{
fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
Ok(ob.downcast::<EarliestDateTimeView>()?.get().inner().clone())
}
}