use crate::{
model::graph::{
edges::GqlEdges,
filtering::{GqlEdgeFilter, GqlNodeFilter, NodeViewCollection},
history::GqlHistory,
nodes::GqlNodes,
path_from_node::GqlPathFromNode,
property::{GqlMetadata, GqlProperties},
timeindex::{GqlEventTime, GqlTimeInput},
windowset::GqlNodeWindowSet,
GqlAlignmentUnit, WindowDuration,
},
rayon::blocking_compute,
};
use dynamic_graphql::{ResolvedObject, ResolvedObjectFields};
use raphtory::{
algorithms::components::{in_component, out_component},
core::utils::time::TryIntoInterval,
db::{
api::{
properties::dyn_props::DynProperties,
view::{filter_ops::NodeSelect, Filter, *},
},
graph::{
node::NodeView,
views::filter::model::{
edge_filter::CompositeEdgeFilter, node_filter::CompositeNodeFilter,
},
},
},
errors::GraphError,
prelude::NodeStateOps,
};
use raphtory_api::core::utils::time::IntoTime;
#[derive(ResolvedObject, Clone)]
#[graphql(name = "Node")]
pub struct GqlNode {
pub(crate) vv: NodeView<'static, DynamicGraph>,
}
impl<G: StaticGraphViewOps + IntoDynamic> From<NodeView<'static, G>> for GqlNode {
fn from(value: NodeView<'static, G>) -> Self {
Self {
vv: NodeView::new_internal(value.graph.into_dynamic(), value.node),
}
}
}
#[ResolvedObjectFields]
impl GqlNode {
async fn id(&self) -> String {
self.vv.id().to_string()
}
pub async fn name(&self) -> String {
self.vv.name()
}
async fn default_layer(&self) -> GqlNode {
self.vv.default_layer().into()
}
async fn layers(&self, names: Vec<String>) -> GqlNode {
let self_clone = self.clone();
blocking_compute(move || self_clone.vv.valid_layers(names).into()).await
}
async fn exclude_layers(&self, names: Vec<String>) -> GqlNode {
let self_clone = self.clone();
blocking_compute(move || self_clone.vv.exclude_valid_layers(names).into()).await
}
async fn layer(&self, name: String) -> GqlNode {
self.vv.valid_layers(name).into()
}
async fn exclude_layer(&self, name: String) -> GqlNode {
self.vv.exclude_valid_layers(name).into()
}
async fn rolling(
&self,
window: WindowDuration,
step: Option<WindowDuration>,
alignment_unit: Option<GqlAlignmentUnit>,
) -> Result<GqlNodeWindowSet, GraphError> {
let window = window.try_into_interval()?;
let step = step.map(|x| x.try_into_interval()).transpose()?;
let ws = if let Some(unit) = alignment_unit {
self.vv.rolling_aligned(window, step, unit.into())?
} else {
self.vv.rolling(window, step)?
};
Ok(GqlNodeWindowSet::new(ws))
}
async fn expanding(
&self,
step: WindowDuration,
alignment_unit: Option<GqlAlignmentUnit>,
) -> Result<GqlNodeWindowSet, GraphError> {
let step = step.try_into_interval()?;
let ws = if let Some(unit) = alignment_unit {
self.vv.expanding_aligned(step, unit.into())?
} else {
self.vv.expanding(step)?
};
Ok(GqlNodeWindowSet::new(ws))
}
async fn window(&self, start: GqlTimeInput, end: GqlTimeInput) -> GqlNode {
self.vv.window(start.into_time(), end.into_time()).into()
}
async fn at(&self, time: GqlTimeInput) -> GqlNode {
self.vv.at(time.into_time()).into()
}
async fn latest(&self) -> GqlNode {
let self_clone = self.clone();
blocking_compute(move || self_clone.vv.latest().into()).await
}
async fn snapshot_at(&self, time: GqlTimeInput) -> GqlNode {
self.vv.snapshot_at(time.into_time()).into()
}
async fn snapshot_latest(&self) -> GqlNode {
let self_clone = self.clone();
blocking_compute(move || self_clone.vv.snapshot_latest().into()).await
}
async fn before(&self, time: GqlTimeInput) -> GqlNode {
self.vv.before(time.into_time()).into()
}
async fn after(&self, time: GqlTimeInput) -> GqlNode {
self.vv.after(time.into_time()).into()
}
async fn shrink_window(&self, start: GqlTimeInput, end: GqlTimeInput) -> Self {
self.vv
.shrink_window(start.into_time(), end.into_time())
.into()
}
async fn shrink_start(&self, start: GqlTimeInput) -> Self {
self.vv.shrink_start(start.into_time()).into()
}
async fn shrink_end(&self, end: GqlTimeInput) -> Self {
self.vv.shrink_end(end.into_time()).into()
}
async fn apply_views(&self, views: Vec<NodeViewCollection>) -> Result<GqlNode, GraphError> {
let mut return_view: GqlNode = self.vv.clone().into();
for view in views {
return_view = match view {
NodeViewCollection::DefaultLayer(apply) => {
if apply {
return_view.default_layer().await
} else {
return_view
}
}
NodeViewCollection::Latest(apply) => {
if apply {
return_view.latest().await
} else {
return_view
}
}
NodeViewCollection::SnapshotLatest(apply) => {
if apply {
return_view.snapshot_latest().await
} else {
return_view
}
}
NodeViewCollection::SnapshotAt(at) => return_view.snapshot_at(at).await,
NodeViewCollection::Layers(layers) => return_view.layers(layers).await,
NodeViewCollection::ExcludeLayers(layers) => {
return_view.exclude_layers(layers).await
}
NodeViewCollection::ExcludeLayer(layer) => return_view.exclude_layer(layer).await,
NodeViewCollection::Window(window) => {
return_view.window(window.start, window.end).await
}
NodeViewCollection::At(at) => return_view.at(at).await,
NodeViewCollection::Before(time) => return_view.before(time).await,
NodeViewCollection::After(time) => return_view.after(time).await,
NodeViewCollection::ShrinkWindow(window) => {
return_view.shrink_window(window.start, window.end).await
}
NodeViewCollection::ShrinkStart(time) => return_view.shrink_start(time).await,
NodeViewCollection::ShrinkEnd(time) => return_view.shrink_end(time).await,
NodeViewCollection::NodeFilter(filter) => return_view.filter(filter).await?,
}
}
Ok(return_view)
}
async fn earliest_time(&self) -> GqlEventTime {
let self_clone = self.clone();
blocking_compute(move || self_clone.vv.earliest_time().into()).await
}
async fn first_update(&self) -> GqlEventTime {
let self_clone = self.clone();
blocking_compute(move || self_clone.vv.history().earliest_time().into()).await
}
async fn latest_time(&self) -> GqlEventTime {
let self_clone = self.clone();
blocking_compute(move || self_clone.vv.latest_time().into()).await
}
async fn last_update(&self) -> GqlEventTime {
let self_clone = self.clone();
blocking_compute(move || self_clone.vv.history().latest_time().into()).await
}
async fn start(&self) -> GqlEventTime {
self.vv.start().into()
}
async fn end(&self) -> GqlEventTime {
self.vv.end().into()
}
async fn history(&self) -> GqlHistory {
let self_clone = self.clone();
blocking_compute(move || self_clone.vv.history().into()).await
}
async fn edge_history_count(&self) -> usize {
self.vv.edge_history_count()
}
async fn is_active(&self) -> bool {
let self_clone = self.clone();
blocking_compute(move || self_clone.vv.is_active()).await
}
pub async fn node_type(&self) -> Option<String> {
match self.vv.node_type() {
None => None,
str => str.map(|s| (*s).to_string()),
}
}
async fn properties(&self) -> GqlProperties {
Into::<DynProperties>::into(self.vv.properties()).into()
}
async fn metadata(&self) -> GqlMetadata {
self.vv.metadata().into()
}
async fn degree(&self) -> usize {
let self_clone = self.clone();
blocking_compute(move || self_clone.vv.degree()).await
}
async fn out_degree(&self) -> usize {
let self_clone = self.clone();
blocking_compute(move || self_clone.vv.out_degree()).await
}
async fn in_degree(&self) -> usize {
let self_clone = self.clone();
blocking_compute(move || self_clone.vv.in_degree()).await
}
async fn in_component(&self) -> GqlNodes {
let self_clone = self.clone();
blocking_compute(move || GqlNodes::new(in_component(self_clone.vv.clone()).nodes())).await
}
async fn out_component(&self) -> GqlNodes {
let self_clone = self.clone();
blocking_compute(move || GqlNodes::new(out_component(self_clone.vv.clone()).nodes())).await
}
async fn edges(&self, select: Option<GqlEdgeFilter>) -> Result<GqlEdges, GraphError> {
let base = self.vv.edges();
if let Some(sel) = select {
let ef: CompositeEdgeFilter = sel.try_into()?;
let narrowed = blocking_compute(move || base.select(ef)).await?;
return Ok(GqlEdges::new(narrowed));
}
Ok(GqlEdges::new(base))
}
async fn out_edges(&self, select: Option<GqlEdgeFilter>) -> Result<GqlEdges, GraphError> {
let base = self.vv.out_edges();
if let Some(sel) = select {
let ef: CompositeEdgeFilter = sel.try_into()?;
let narrowed = blocking_compute(move || base.select(ef)).await?;
return Ok(GqlEdges::new(narrowed));
}
Ok(GqlEdges::new(base))
}
async fn in_edges(&self, select: Option<GqlEdgeFilter>) -> Result<GqlEdges, GraphError> {
let base = self.vv.in_edges();
if let Some(sel) = select {
let ef: CompositeEdgeFilter = sel.try_into()?;
let narrowed = blocking_compute(move || base.select(ef)).await?;
return Ok(GqlEdges::new(narrowed));
}
Ok(GqlEdges::new(base))
}
async fn neighbours<'a>(
&self,
select: Option<GqlNodeFilter>,
) -> Result<GqlPathFromNode, GraphError> {
let base = self.vv.neighbours();
if let Some(expr) = select {
let nf: CompositeNodeFilter = expr.try_into()?;
let narrowed = blocking_compute(move || base.select(nf)).await?;
return Ok(GqlPathFromNode::new(narrowed));
}
Ok(GqlPathFromNode::new(base))
}
async fn in_neighbours<'a>(
&self,
select: Option<GqlNodeFilter>,
) -> Result<GqlPathFromNode, GraphError> {
let base = self.vv.in_neighbours();
if let Some(expr) = select {
let nf: CompositeNodeFilter = expr.try_into()?;
let narrowed = blocking_compute(move || base.select(nf)).await?;
return Ok(GqlPathFromNode::new(narrowed));
}
Ok(GqlPathFromNode::new(base))
}
async fn out_neighbours(
&self,
select: Option<GqlNodeFilter>,
) -> Result<GqlPathFromNode, GraphError> {
let base = self.vv.out_neighbours();
if let Some(expr) = select {
let nf: CompositeNodeFilter = expr.try_into()?;
let narrowed = blocking_compute(move || base.select(nf)).await?;
return Ok(GqlPathFromNode::new(narrowed));
}
Ok(GqlPathFromNode::new(base))
}
async fn filter(&self, expr: GqlNodeFilter) -> Result<Self, GraphError> {
let self_clone = self.clone();
blocking_compute(move || {
let filter: CompositeNodeFilter = expr.try_into()?;
let filtered = self_clone.vv.filter(filter)?;
Ok(self_clone.update(filtered.into_dynamic()))
})
.await
}
}
impl GqlNode {
fn update<N: Into<NodeView<'static, DynamicGraph>>>(&self, node: N) -> Self {
Self { vv: node.into() }
}
}