use crate::python::client::{
build_property_string, build_query, raphtory_client::PyRaphtoryClient,
};
use minijinja::context;
use pyo3::{pyclass, pymethods, Python};
use raphtory::errors::GraphError;
use raphtory_api::core::{
entities::properties::prop::Prop,
storage::timeindex::{AsTime, EventTime},
utils::time::IntoTime,
};
use std::collections::HashMap;
#[derive(Clone)]
#[pyclass(name = "RemoteEdge", module = "raphtory.graphql")]
pub struct PyRemoteEdge {
pub(crate) path: String,
pub(crate) client: PyRaphtoryClient,
pub(crate) src: String,
pub(crate) dst: String,
}
impl PyRemoteEdge {
pub(crate) fn new(path: String, client: PyRaphtoryClient, src: String, dst: String) -> Self {
PyRemoteEdge {
path,
client,
src,
dst,
}
}
}
#[pymethods]
impl PyRemoteEdge {
#[pyo3(signature = (t, properties=None, layer=None))]
fn add_updates(
&self,
py: Python,
t: EventTime,
properties: Option<HashMap<String, Prop>>,
layer: Option<&str>,
) -> Result<(), GraphError> {
let template = r#"
{
updateGraph(path: "{{path}}") {
edge(src: "{{src}}",dst: "{{dst}}") {
addUpdates(time: {{t}} {% if properties is not none %}, properties: {{ properties | safe }} {% endif %} {% if layer is not none %}, layer: "{{layer}}" {% endif %})
}
}
}
"#;
let query_context = context! {
path => self.path,
src => self.src,
dst => self.dst,
t => t.into_time().t(),
properties => properties.map(|p| build_property_string(p)),
layer => layer
};
let query = build_query(template, query_context)?;
let _ = &self.client.query(py, query, None)?;
Ok(())
}
#[pyo3(signature = (t, layer=None))]
fn delete(&self, py: Python, t: EventTime, layer: Option<&str>) -> Result<(), GraphError> {
let template = r#"
{
updateGraph(path: "{{path}}") {
edge(src: "{{src}}",dst: "{{dst}}") {
delete(time: {{t}}, {% if layer is not none %}, layer: "{{layer}}" {% endif %})
}
}
}
"#;
let query_context = context! {
path => self.path,
src => self.src,
dst => self.dst,
t => t.into_time().t(),
layer => layer
};
let query = build_query(template, query_context)?;
let _ = &self.client.query(py, query, None)?;
Ok(())
}
#[pyo3(signature = (properties, layer=None))]
fn add_metadata(
&self,
py: Python,
properties: HashMap<String, Prop>,
layer: Option<&str>,
) -> Result<(), GraphError> {
let template = r#"
{
updateGraph(path: "{{path}}") {
edge(src: "{{src}}",dst: "{{dst}}") {
addMetadata(properties: {{ properties | safe }} {% if layer is not none %}, layer: "{{layer}}" {% endif %})
}
}
}
"#;
let query_context = context! {
path => self.path,
src => self.src,
dst => self.dst,
properties => build_property_string(properties),
layer => layer
};
let query = build_query(template, query_context)?;
let _ = &self.client.query(py, query, None)?;
Ok(())
}
#[pyo3(signature = (properties, layer=None))]
pub fn update_metadata(
&self,
py: Python,
properties: HashMap<String, Prop>,
layer: Option<&str>,
) -> Result<(), GraphError> {
let template = r#"
{
updateGraph(path: "{{path}}") {
edge(src: "{{src}}",dst: "{{dst}}") {
updateMetadata(properties: {{ properties | safe }} {% if layer is not none %}, layer: "{{layer}}" {% endif %})
}
}
}
"#;
let query_context = context! {
path => self.path,
src => self.src,
dst => self.dst,
properties => build_property_string(properties),
layer => layer
};
let query = build_query(template, query_context)?;
let _ = &self.client.query(py, query, None)?;
Ok(())
}
}