use crate::{
model::graph::{
filtering::{GqlNodeFilter, PathFromNodeViewCollection},
node::GqlNode,
timeindex::{GqlEventTime, GqlTimeInput},
windowset::GqlPathFromNodeWindowSet,
GqlAlignmentUnit, WindowDuration,
},
rayon::blocking_compute,
};
use dynamic_graphql::{ResolvedObject, ResolvedObjectFields};
use raphtory::{
core::utils::time::TryIntoInterval,
db::{
api::view::{filter_ops::NodeSelect, DynamicGraph, Filter},
graph::{path::PathFromNode, views::filter::model::CompositeNodeFilter},
},
errors::GraphError,
prelude::*,
};
use raphtory_api::core::utils::time::IntoTime;
#[derive(ResolvedObject, Clone)]
#[graphql(name = "PathFromNode")]
pub(crate) struct GqlPathFromNode {
pub(crate) nn: PathFromNode<'static, DynamicGraph>,
}
impl GqlPathFromNode {
fn update<N: Into<PathFromNode<'static, DynamicGraph>>>(&self, nodes: N) -> Self {
GqlPathFromNode::new(nodes)
}
}
impl GqlPathFromNode {
pub(crate) fn new<N: Into<PathFromNode<'static, DynamicGraph>>>(nodes: N) -> Self {
Self { nn: nodes.into() }
}
fn iter(&self) -> Box<dyn Iterator<Item = GqlNode> + '_> {
let iter = self.nn.iter().map(GqlNode::from);
Box::new(iter)
}
}
#[ResolvedObjectFields]
impl GqlPathFromNode {
async fn layers(&self, names: Vec<String>) -> Self {
let self_clone = self.clone();
blocking_compute(move || self_clone.update(self_clone.nn.valid_layers(names))).await
}
async fn exclude_layers(&self, names: Vec<String>) -> Self {
let self_clone = self.clone();
blocking_compute(move || self_clone.update(self_clone.nn.exclude_valid_layers(names))).await
}
async fn layer(&self, name: String) -> Self {
self.update(self.nn.valid_layers(name))
}
async fn exclude_layer(&self, name: String) -> Self {
self.update(self.nn.exclude_valid_layers(name))
}
async fn rolling(
&self,
window: WindowDuration,
step: Option<WindowDuration>,
alignment_unit: Option<GqlAlignmentUnit>,
) -> Result<GqlPathFromNodeWindowSet, GraphError> {
let window = window.try_into_interval()?;
let step = step.map(|x| x.try_into_interval()).transpose()?;
let ws = if let Some(unit) = alignment_unit {
self.nn.rolling_aligned(window, step, unit.into())?
} else {
self.nn.rolling(window, step)?
};
Ok(GqlPathFromNodeWindowSet::new(ws))
}
async fn expanding(
&self,
step: WindowDuration,
alignment_unit: Option<GqlAlignmentUnit>,
) -> Result<GqlPathFromNodeWindowSet, GraphError> {
let step = step.try_into_interval()?;
let ws = if let Some(unit) = alignment_unit {
self.nn.expanding_aligned(step, unit.into())?
} else {
self.nn.expanding(step)?
};
Ok(GqlPathFromNodeWindowSet::new(ws))
}
async fn window(&self, start: GqlTimeInput, end: GqlTimeInput) -> Self {
self.update(self.nn.window(start.into_time(), end.into_time()))
}
async fn at(&self, time: GqlTimeInput) -> Self {
self.update(self.nn.at(time.into_time()))
}
async fn snapshot_latest(&self) -> Self {
let self_clone = self.clone();
blocking_compute(move || self_clone.update(self_clone.nn.snapshot_latest())).await
}
async fn snapshot_at(&self, time: GqlTimeInput) -> Self {
self.update(self.nn.snapshot_at(time.into_time()))
}
async fn latest(&self) -> Self {
let self_clone = self.clone();
blocking_compute(move || self_clone.update(self_clone.nn.latest())).await
}
async fn before(&self, time: GqlTimeInput) -> Self {
self.update(self.nn.before(time.into_time()))
}
async fn after(&self, time: GqlTimeInput) -> Self {
self.update(self.nn.after(time.into_time()))
}
async fn shrink_window(&self, start: GqlTimeInput, end: GqlTimeInput) -> Self {
self.update(self.nn.shrink_window(start.into_time(), end.into_time()))
}
async fn shrink_start(&self, start: GqlTimeInput) -> Self {
self.update(self.nn.shrink_start(start.into_time()))
}
async fn shrink_end(&self, end: GqlTimeInput) -> Self {
self.update(self.nn.shrink_end(end.into_time()))
}
async fn type_filter(&self, node_types: Vec<String>) -> Self {
let self_clone = self.clone();
blocking_compute(move || self_clone.update(self_clone.nn.type_filter(&node_types))).await
}
async fn start(&self) -> GqlEventTime {
self.nn.start().into()
}
async fn end(&self) -> GqlEventTime {
self.nn.end().into()
}
async fn count(&self) -> usize {
let self_clone = self.clone();
blocking_compute(move || self_clone.nn.len()).await
}
async fn page(
&self,
limit: usize,
offset: Option<usize>,
page_index: Option<usize>,
) -> Vec<GqlNode> {
let self_clone = self.clone();
blocking_compute(move || {
let start = page_index.unwrap_or(0) * limit + offset.unwrap_or(0);
self_clone.iter().skip(start).take(limit).collect()
})
.await
}
async fn list(&self) -> Vec<GqlNode> {
let self_clone = self.clone();
blocking_compute(move || self_clone.iter().collect()).await
}
async fn ids(&self) -> Vec<String> {
let self_clone = self.clone();
blocking_compute(move || self_clone.nn.name().collect()).await
}
async fn apply_views(
&self,
views: Vec<PathFromNodeViewCollection>,
) -> Result<GqlPathFromNode, GraphError> {
let mut return_view: GqlPathFromNode = self.clone();
for view in views {
return_view = match view {
PathFromNodeViewCollection::Layers(layers) => return_view.layers(layers).await,
PathFromNodeViewCollection::ExcludeLayers(layers) => {
return_view.exclude_layers(layers).await
}
PathFromNodeViewCollection::ExcludeLayer(layer) => {
return_view.exclude_layer(layer).await
}
PathFromNodeViewCollection::Window(window) => {
return_view.window(window.start, window.end).await
}
PathFromNodeViewCollection::ShrinkWindow(window) => {
return_view.shrink_window(window.start, window.end).await
}
PathFromNodeViewCollection::ShrinkStart(time) => {
return_view.shrink_start(time).await
}
PathFromNodeViewCollection::ShrinkEnd(time) => return_view.shrink_end(time).await,
PathFromNodeViewCollection::At(time) => return_view.at(time).await,
PathFromNodeViewCollection::SnapshotLatest(apply) => {
if apply {
return_view.snapshot_latest().await
} else {
return_view
}
}
PathFromNodeViewCollection::SnapshotAt(time) => return_view.snapshot_at(time).await,
PathFromNodeViewCollection::Latest(apply) => {
if apply {
return_view.latest().await
} else {
return_view
}
}
PathFromNodeViewCollection::Before(time) => return_view.before(time).await,
PathFromNodeViewCollection::After(time) => return_view.after(time).await,
}
}
Ok(return_view)
}
async fn filter(&self, expr: GqlNodeFilter) -> Result<Self, GraphError> {
let self_clone = self.clone();
blocking_compute(move || {
let filter: CompositeNodeFilter = expr.try_into()?;
let filtered = self_clone.nn.filter(filter)?;
Ok(self_clone.update(filtered.into_dyn()))
})
.await
}
async fn select(&self, expr: GqlNodeFilter) -> Result<Self, GraphError> {
let self_clone = self.clone();
blocking_compute(move || {
let filter: CompositeNodeFilter = expr.try_into()?;
let filtered = self_clone.nn.select(filter)?;
Ok(self_clone.update(filtered.into_dyn()))
})
.await
}
}