use crate::{
graph::{GraphWithVectors, UpdateEmbeddings},
model::graph::{edge::GqlEdge, graph::GqlGraph, node::GqlNode, property::Value},
paths::ExistingGraphFolder,
rayon::blocking_write,
};
use dynamic_graphql::{InputObject, ResolvedObject, ResolvedObjectFields};
use itertools::Itertools;
use raphtory::{
db::graph::{edge::EdgeView, node::NodeView},
errors::GraphError,
prelude::*,
};
use raphtory_api::core::storage::arc_str::OptionAsStr;
use std::{
error::Error,
fmt::{Debug, Display, Formatter},
};
#[derive(Debug)]
pub struct BatchFailures {
batch_failures: Vec<(usize, GraphError)>,
write_failure: Option<GraphError>,
}
fn split_failures<S>(
results: impl IntoIterator<Item = Result<S, GraphError>>,
write_result: Result<(), GraphError>,
) -> (Vec<S>, Option<BatchFailures>) {
let mut succeeded = Vec::new();
let mut batch_failures = Vec::new();
for (i, res) in results.into_iter().enumerate() {
match res {
Ok(s) => succeeded.push(s),
Err(err) => batch_failures.push((i, err)),
}
}
let write_failure = write_result.err();
let err = (!batch_failures.is_empty()).then(|| BatchFailures {
batch_failures,
write_failure,
});
(succeeded, err)
}
impl Display for BatchFailures {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
if let Some(embedding_failure) = self.write_failure.as_ref() {
write!(f, "Writing failed: {embedding_failure}")?;
}
if !self.batch_failures.is_empty() {
if self.write_failure.is_some() {
f.write_str(", ")?;
}
f.write_str("Batch updates failed: ")?;
f.write_str(
&self
.batch_failures
.iter()
.map(|(idx, err)| format!("{idx} -> {err}"))
.join(", "),
)?;
}
Ok(())
}
}
impl Error for BatchFailures {}
#[derive(InputObject, Clone)]
#[graphql(name = "PropertyInput")]
pub struct GqlPropertyInput {
key: String,
value: Value,
}
#[derive(InputObject, Clone)]
pub struct TemporalPropertyInput {
time: i64,
properties: Option<Vec<GqlPropertyInput>>,
}
#[derive(InputObject, Clone)]
pub struct NodeAddition {
name: String,
node_type: Option<String>,
metadata: Option<Vec<GqlPropertyInput>>,
updates: Option<Vec<TemporalPropertyInput>>,
}
#[derive(InputObject, Clone)]
pub struct EdgeAddition {
src: String,
dst: String,
layer: Option<String>,
metadata: Option<Vec<GqlPropertyInput>>,
updates: Option<Vec<TemporalPropertyInput>>,
}
#[derive(ResolvedObject, Clone)]
#[graphql(name = "MutableGraph")]
pub struct GqlMutableGraph {
path: ExistingGraphFolder,
graph: GraphWithVectors,
}
impl GqlMutableGraph {
pub(crate) fn new(path: ExistingGraphFolder, graph: GraphWithVectors) -> Self {
Self {
path: path.into(),
graph,
}
}
}
fn as_properties(
properties: Vec<GqlPropertyInput>,
) -> Result<impl Iterator<Item = (String, Prop)>, GraphError> {
let props: Result<Vec<(String, Prop)>, GraphError> = properties
.into_iter()
.map(|p| {
let v = Prop::try_from(p.value)?;
Ok((p.key, v))
})
.collect();
props.map(|vec| vec.into_iter())
}
#[ResolvedObjectFields]
impl GqlMutableGraph {
async fn graph(&self) -> GqlGraph {
GqlGraph::new(self.path.clone(), self.graph.graph.clone())
}
async fn node(&self, name: String) -> Option<GqlMutableNode> {
self.graph.node(name).map(|n| n.into())
}
async fn add_node(
&self,
time: i64,
name: String,
properties: Option<Vec<GqlPropertyInput>>,
node_type: Option<String>,
) -> Result<GqlMutableNode, GraphError> {
let self_clone = self.clone();
let node = blocking_write(move || {
let prop_iter = as_properties(properties.unwrap_or(vec![]))?;
let node = self_clone
.graph
.add_node(time, &name, prop_iter, node_type.as_str())?;
self_clone.graph.write_updates()?;
Ok::<_, GraphError>(node)
})
.await?;
let _ = node.update_embeddings().await;
Ok(node.into())
}
async fn create_node(
&self,
time: i64,
name: String,
properties: Option<Vec<GqlPropertyInput>>,
node_type: Option<String>,
) -> Result<GqlMutableNode, GraphError> {
let self_clone = self.clone();
let node = blocking_write(move || {
let prop_iter = as_properties(properties.unwrap_or(vec![]))?;
let node = self_clone
.graph
.create_node(time, &name, prop_iter, node_type.as_str())?;
self_clone.graph.write_updates()?;
Ok::<_, GraphError>(node)
})
.await?;
let _ = node.update_embeddings().await;
Ok(node.into())
}
async fn add_nodes(&self, nodes: Vec<NodeAddition>) -> Result<bool, BatchFailures> {
let self_clone = self.clone();
let (succeeded, batch_failures) = blocking_write(move || {
let nodes: Vec<_> = nodes
.iter()
.map(|node| {
let node = node.clone();
let name = node.name.as_str();
for prop in node.updates.unwrap_or(vec![]) {
let prop_iter = as_properties(prop.properties.unwrap_or(vec![]))?;
self_clone
.graph
.add_node(prop.time, name, prop_iter, None)?;
}
if let Some(node_type) = node.node_type.as_str() {
self_clone.get_node_view(name)?.set_node_type(node_type)?;
}
let metadata = node.metadata.unwrap_or(vec![]);
if !metadata.is_empty() {
let prop_iter = as_properties(metadata)?;
self_clone.get_node_view(name)?.add_metadata(prop_iter)?;
}
self_clone.get_node_view(name)
})
.collect();
let write_res = self_clone.graph.write_updates();
split_failures(nodes, write_res)
})
.await;
let _ = self.graph.update_node_embeddings(succeeded).await;
if let Some(failures) = batch_failures {
Err(failures)
} else {
Ok(true)
}
}
async fn edge(&self, src: String, dst: String) -> Option<GqlMutableEdge> {
self.graph.edge(src, dst).map(|e| e.into())
}
async fn add_edge(
&self,
time: i64,
src: String,
dst: String,
properties: Option<Vec<GqlPropertyInput>>,
layer: Option<String>,
) -> Result<GqlMutableEdge, GraphError> {
let self_clone = self.clone();
let edge = blocking_write(move || {
let prop_iter = as_properties(properties.unwrap_or(vec![]))?;
let edge = self_clone
.graph
.add_edge(time, src, dst, prop_iter, layer.as_str())?;
self_clone.graph.write_updates()?;
Ok::<_, GraphError>(edge)
})
.await?;
let _ = edge.update_embeddings().await;
Ok(edge.into())
}
async fn add_edges(&self, edges: Vec<EdgeAddition>) -> Result<bool, BatchFailures> {
let self_clone = self.clone();
let (edge_pairs, failures) = blocking_write(move || {
let edge_res: Vec<_> = edges
.into_iter()
.map(|edge| {
let src = edge.src.as_str();
let dst = edge.dst.as_str();
let layer = edge.layer.as_str();
for prop in edge.updates.unwrap_or(vec![]) {
let prop_iter = as_properties(prop.properties.unwrap_or(vec![]))?;
self_clone
.graph
.add_edge(prop.time, src, dst, prop_iter, layer)?;
}
let metadata = edge.metadata.unwrap_or(vec![]);
if !metadata.is_empty() {
let prop_iter = as_properties(metadata)?;
self_clone
.get_edge_view(src.to_string(), dst.to_string())?
.add_metadata(prop_iter, layer)?;
}
Ok((edge.src, edge.dst))
})
.collect();
let write_res = self_clone.graph.write_updates();
split_failures(edge_res, write_res)
})
.await;
let _ = self.graph.update_edge_embeddings(edge_pairs).await;
match failures {
None => Ok(true),
Some(failures) => Err(failures),
}
}
async fn delete_edge(
&self,
time: i64,
src: String,
dst: String,
layer: Option<String>,
) -> Result<GqlMutableEdge, GraphError> {
let self_clone = self.clone();
let edge = blocking_write(move || {
let edge = self_clone
.graph
.delete_edge(time, src, dst, layer.as_str())?;
self_clone.graph.write_updates()?;
Ok::<_, GraphError>(edge)
})
.await?;
let _ = edge.update_embeddings().await;
Ok(edge.into())
}
async fn add_properties(
&self,
t: i64,
properties: Vec<GqlPropertyInput>,
) -> Result<bool, GraphError> {
let self_clone = self.clone();
blocking_write(move || {
self_clone
.graph
.add_properties(t, as_properties(properties)?)?;
self_clone.graph.write_updates()?;
Ok(true)
})
.await
}
async fn add_metadata(&self, properties: Vec<GqlPropertyInput>) -> Result<bool, GraphError> {
let self_clone = self.clone();
blocking_write(move || {
self_clone.graph.add_metadata(as_properties(properties)?)?;
self_clone.graph.write_updates()?;
Ok(true)
})
.await
}
async fn update_metadata(&self, properties: Vec<GqlPropertyInput>) -> Result<bool, GraphError> {
let self_clone = self.clone();
blocking_write(move || {
self_clone
.graph
.update_metadata(as_properties(properties)?)?;
self_clone.graph.write_updates()?;
Ok(true)
})
.await
}
}
impl GqlMutableGraph {
fn get_node_view(&self, name: &str) -> Result<NodeView<'static, GraphWithVectors>, GraphError> {
self.graph
.node(name)
.ok_or_else(|| GraphError::NodeMissingError(GID::Str(name.to_owned())))
}
fn get_edge_view(
&self,
src: String,
dst: String,
) -> Result<EdgeView<GraphWithVectors>, GraphError> {
self.graph
.edge(src.clone(), dst.clone())
.ok_or(GraphError::EdgeMissingError {
src: GID::Str(src),
dst: GID::Str(dst),
})
}
}
#[derive(ResolvedObject, Clone)]
#[graphql(name = "MutableNode")]
pub struct GqlMutableNode {
node: NodeView<'static, GraphWithVectors>,
}
impl From<NodeView<'static, GraphWithVectors>> for GqlMutableNode {
fn from(node: NodeView<'static, GraphWithVectors>) -> Self {
Self { node }
}
}
#[ResolvedObjectFields]
impl GqlMutableNode {
async fn success(&self) -> bool {
true
}
async fn node(&self) -> GqlNode {
self.node.clone().into()
}
async fn add_metadata(&self, properties: Vec<GqlPropertyInput>) -> Result<bool, GraphError> {
let self_clone = self.clone();
blocking_write(move || {
self_clone.node.add_metadata(as_properties(properties)?)?;
self_clone.node.graph.write_updates()?;
Ok::<_, GraphError>(())
})
.await?;
let _ = self.node.update_embeddings().await;
Ok(true)
}
async fn set_node_type(&self, new_type: String) -> Result<bool, GraphError> {
let self_clone = self.clone();
blocking_write(move || {
self_clone.node.set_node_type(&new_type)?;
self_clone.node.graph.write_updates()?;
Ok::<_, GraphError>(())
})
.await?;
let _ = self.node.update_embeddings().await;
Ok(true)
}
async fn update_metadata(&self, properties: Vec<GqlPropertyInput>) -> Result<bool, GraphError> {
let self_clone = self.clone();
blocking_write(move || {
self_clone
.node
.update_metadata(as_properties(properties)?)?;
self_clone.node.graph.write_updates()?;
Ok::<_, GraphError>(())
})
.await?;
let _ = self.node.update_embeddings().await;
Ok(true)
}
async fn add_updates(
&self,
time: i64,
properties: Option<Vec<GqlPropertyInput>>,
) -> Result<bool, GraphError> {
let self_clone = self.clone();
blocking_write(move || {
self_clone
.node
.add_updates(time, as_properties(properties.unwrap_or(vec![]))?)?;
self_clone.node.graph.write_updates()?;
Ok::<_, GraphError>(())
})
.await?;
let _ = self.node.update_embeddings().await;
Ok(true)
}
}
#[derive(ResolvedObject, Clone)]
#[graphql(name = "MutableEdge")]
pub struct GqlMutableEdge {
edge: EdgeView<GraphWithVectors>,
}
impl From<EdgeView<GraphWithVectors>> for GqlMutableEdge {
fn from(edge: EdgeView<GraphWithVectors>) -> Self {
Self { edge }
}
}
#[ResolvedObjectFields]
impl GqlMutableEdge {
async fn success(&self) -> bool {
true
}
async fn edge(&self) -> GqlEdge {
self.edge.clone().into()
}
async fn src(&self) -> GqlMutableNode {
self.edge.src().into()
}
async fn dst(&self) -> GqlMutableNode {
self.edge.dst().into()
}
async fn delete(&self, time: i64, layer: Option<String>) -> Result<bool, GraphError> {
let self_clone = self.clone();
blocking_write(move || {
self_clone.edge.delete(time, layer.as_str())?;
self_clone.edge.graph.write_updates()?;
Ok::<_, GraphError>(())
})
.await?;
let _ = self.edge.update_embeddings().await;
Ok(true)
}
async fn add_metadata(
&self,
properties: Vec<GqlPropertyInput>,
layer: Option<String>,
) -> Result<bool, GraphError> {
let self_clone = self.clone();
blocking_write(move || {
self_clone
.edge
.add_metadata(as_properties(properties)?, layer.as_str())?;
self_clone.edge.graph.write_updates()?;
Ok::<_, GraphError>(())
})
.await?;
let _ = self.edge.update_embeddings().await;
Ok(true)
}
async fn update_metadata(
&self,
properties: Vec<GqlPropertyInput>,
layer: Option<String>,
) -> Result<bool, GraphError> {
let self_clone = self.clone();
blocking_write(move || {
self_clone
.edge
.update_metadata(as_properties(properties)?, layer.as_str())?;
self_clone.edge.graph.write_updates()?;
Ok::<_, GraphError>(())
})
.await?;
let _ = self.edge.update_embeddings().await;
Ok(true)
}
async fn add_updates(
&self,
time: i64,
properties: Option<Vec<GqlPropertyInput>>,
layer: Option<String>,
) -> Result<bool, GraphError> {
let self_clone = self.clone();
blocking_write(move || {
self_clone.edge.add_updates(
time,
as_properties(properties.unwrap_or(vec![]))?,
layer.as_str(),
)?;
self_clone.edge.graph.write_updates()?;
Ok::<_, GraphError>(())
})
.await?;
let _ = self.edge.update_embeddings().await;
Ok(true)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{config::app_config::AppConfig, data::Data};
use itertools::Itertools;
use raphtory::{
db::api::view::MaterializedGraph,
vectors::{
custom::{serve_custom_embedding, EmbeddingServer},
embeddings::EmbeddingResult,
storage::OpenAIEmbeddings,
template::DocumentTemplate,
Embedding,
},
};
use tempfile::tempdir;
fn fake_embedding(_: &str) -> Vec<f32> {
vec![1.0]
}
fn create_test_graph() -> MaterializedGraph {
let graph = Graph::new();
graph.into()
}
async fn create_mutable_graph(
port: u16,
) -> (GqlMutableGraph, tempfile::TempDir, EmbeddingServer) {
let graph = create_test_graph();
let tmp_dir = tempdir().unwrap();
let config = AppConfig::default();
let data = Data::new(tmp_dir.path(), &config);
let graph_name = "test_graph";
data.insert_graph(graph_name, graph).await.unwrap();
let template = DocumentTemplate {
node_template: Some("{{ name }} is a {{ node_type }}".to_string()),
edge_template: Some("{{ src.name }} appeared with {{ dst.name}}".to_string()),
};
let embedding_server = serve_custom_embedding(None, port, fake_embedding).await;
let config = OpenAIEmbeddings::new("whatever", format!("http://localhost:{port}"));
let vector_cache = data.vector_cache.resolve().await.unwrap();
let model = vector_cache.openai(config.into()).await.unwrap();
data.vectorise_folder(
&ExistingGraphFolder::try_from(tmp_dir.path().to_path_buf(), graph_name).unwrap(),
&template,
model,
)
.await
.unwrap();
let (graph_with_vectors, path) = data.get_graph(graph_name).await.unwrap();
let mutable_graph = GqlMutableGraph::new(path, graph_with_vectors);
(mutable_graph, tmp_dir, embedding_server)
}
#[tokio::test]
async fn test_add_nodes_empty_list() {
let (mutable_graph, _tmp_dir, embedding_server) = create_mutable_graph(1745).await;
let nodes = vec![];
let result = mutable_graph.add_nodes(nodes).await;
assert!(result.is_ok());
assert!(result.unwrap());
embedding_server.stop().await;
}
#[tokio::test]
async fn test_add_nodes_simple() {
let (mutable_graph, _tmp_dir, es) = create_mutable_graph(1746).await;
let nodes = vec![
NodeAddition {
name: "node1".to_string(),
node_type: Some("test_node_type".to_string()),
metadata: None,
updates: Some(vec![TemporalPropertyInput {
time: 0,
properties: None,
}]),
},
NodeAddition {
name: "node2".to_string(),
node_type: Some("test_node_type".to_string()),
metadata: None,
updates: Some(vec![TemporalPropertyInput {
time: 0,
properties: None,
}]),
},
];
let result = mutable_graph.add_nodes(nodes).await;
assert!(result.is_ok());
assert!(result.unwrap());
let embedding = fake_embedding("node1");
let limit = 5;
let result = mutable_graph
.graph
.vectors
.unwrap()
.nodes_by_similarity(&embedding.into(), limit, None)
.execute()
.await;
assert!(result.is_ok());
assert!(result.unwrap().get_documents().await.unwrap().len() == 2);
es.stop().await;
}
#[tokio::test]
async fn test_add_nodes_with_properties() {
let (mutable_graph, _tmp_dir, es) = create_mutable_graph(1747).await;
let nodes = vec![
NodeAddition {
name: "complex_node_1".to_string(),
node_type: Some("employee".to_string()),
metadata: Some(vec![GqlPropertyInput {
key: "department".to_string(),
value: Value::Str("Sales".to_string()),
}]),
updates: Some(vec![
TemporalPropertyInput {
time: 0,
properties: Some(vec![GqlPropertyInput {
key: "salary".to_string(),
value: Value::F64(50000.0),
}]),
},
TemporalPropertyInput {
time: 0,
properties: Some(vec![GqlPropertyInput {
key: "salary".to_string(),
value: Value::F64(55000.0),
}]),
},
]),
},
NodeAddition {
name: "complex_node_2".to_string(),
node_type: Some("employee".to_string()),
metadata: None,
updates: Some(vec![TemporalPropertyInput {
time: 0,
properties: None,
}]),
},
NodeAddition {
name: "complex_node_3".to_string(),
node_type: Some("employee".to_string()),
metadata: None,
updates: Some(vec![TemporalPropertyInput {
time: 0,
properties: Some(vec![GqlPropertyInput {
key: "salary".to_string(),
value: Value::F64(55000.0),
}]),
}]),
},
];
let result = mutable_graph.add_nodes(nodes).await;
assert!(result.is_ok());
assert!(result.unwrap());
let embedding = fake_embedding("complex_node_1");
let limit = 5;
let result = mutable_graph
.graph
.vectors
.unwrap()
.nodes_by_similarity(&embedding.into(), limit, None)
.execute()
.await;
assert!(result.is_ok());
assert!(result.unwrap().get_documents().await.unwrap().len() == 3);
es.stop().await;
}
#[tokio::test]
async fn test_add_edges_simple() {
let (mutable_graph, _tmp_dir, es) = create_mutable_graph(1748).await;
let nodes = vec![
NodeAddition {
name: "node1".to_string(),
node_type: Some("person".to_string()),
metadata: None,
updates: Some(vec![TemporalPropertyInput {
time: 0,
properties: None,
}]),
},
NodeAddition {
name: "node2".to_string(),
node_type: Some("person".to_string()),
metadata: None,
updates: Some(vec![TemporalPropertyInput {
time: 0,
properties: None,
}]),
},
];
let result = mutable_graph.add_nodes(nodes).await;
assert!(result.is_ok());
let edges = vec![
EdgeAddition {
src: "node1".to_string(),
dst: "node2".to_string(),
layer: Some("friendship".to_string()),
metadata: Some(vec![GqlPropertyInput {
key: "strength".to_string(),
value: Value::F64(0.8),
}]),
updates: Some(vec![TemporalPropertyInput {
time: 0,
properties: None,
}]),
},
EdgeAddition {
src: "node2".to_string(),
dst: "node1".to_string(),
layer: Some("friendship".to_string()),
metadata: None,
updates: Some(vec![TemporalPropertyInput {
time: 0,
properties: None,
}]),
},
];
let result = mutable_graph.add_edges(edges).await;
assert!(result.is_ok());
assert!(result.unwrap());
let embedding = fake_embedding("node1 appeared with node2");
let limit = 5;
let result = mutable_graph
.graph
.vectors
.unwrap()
.edges_by_similarity(&embedding.into(), limit, None)
.execute()
.await;
assert!(result.is_ok());
assert!(result.unwrap().get_documents().await.unwrap().len() == 2);
es.stop().await;
}
}