use crate::{
auth::ContextValidation,
data::Data,
model::{
graph::{
collection::GqlCollection, graph::GqlGraph, index::IndexSpecInput,
mutable_graph::GqlMutableGraph, namespace::Namespace,
vectorised_graph::GqlVectorisedGraph,
},
plugins::{mutation_plugin::MutationPlugin, query_plugin::QueryPlugin},
},
paths::{valid_path, ExistingGraphFolder},
rayon::blocking_compute,
url_encode::{url_decode_graph, url_encode_graph},
};
use async_graphql::Context;
use dynamic_graphql::{
App, Enum, InputObject, Mutation, MutationFields, MutationRoot, OneOfInput, ResolvedObject,
ResolvedObjectFields, Result, Upload,
};
use raphtory::{
db::{api::view::MaterializedGraph, graph::views::deletion_graph::PersistentGraph},
errors::{GraphError, GraphResult, InvalidPathReason},
prelude::*,
serialise::InternalStableDecode,
vectors::{
cache::CachedEmbeddingModel,
storage::OpenAIEmbeddings,
template::{DocumentTemplate, DEFAULT_EDGE_TEMPLATE, DEFAULT_NODE_TEMPLATE},
},
version,
};
#[cfg(feature = "storage")]
use raphtory_storage::{core_ops::CoreGraphOps, graph::graph::GraphStorage};
use std::{
error::Error,
fmt::{Display, Formatter},
io::Read,
sync::Arc,
};
use zip::ZipArchive;
pub(crate) mod graph;
pub mod plugins;
pub(crate) mod schema;
pub(crate) mod sorting;
#[derive(InputObject, Debug, Clone, Default)]
pub struct OpenAIConfig {
model: String,
api_base: Option<String>,
api_key_env: Option<String>,
org_id: Option<String>,
project_id: Option<String>,
}
#[derive(OneOfInput, Clone, Debug)]
pub enum EmbeddingModel {
OpenAI(OpenAIConfig),
}
impl EmbeddingModel {
async fn cache<'a>(self, ctx: &Context<'a>) -> GraphResult<CachedEmbeddingModel> {
let data = ctx.data_unchecked::<Data>();
match self {
Self::OpenAI(OpenAIConfig {
model,
api_base,
api_key_env,
org_id,
project_id,
}) => {
let embeddings = OpenAIEmbeddings {
model,
api_base,
api_key_env,
org_id,
project_id,
dim: None,
};
let vector_cache = data.vector_cache.resolve().await?;
vector_cache.openai(embeddings.into()).await
}
}
}
}
pub(crate) async fn blocking_io<F, R>(f: F) -> R
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
tokio::task::spawn_blocking(f).await.unwrap()
}
#[derive(Debug)]
pub struct MissingGraph;
impl Display for MissingGraph {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Graph does not exist")
}
}
impl Error for MissingGraph {}
#[derive(thiserror::Error, Debug)]
pub enum GqlGraphError {
#[error("Disk Graph is immutable")]
ImmutableDiskGraph,
#[error("Graph does exists at path {0}")]
GraphDoesNotExists(String),
#[error("Failed to load graph")]
FailedToLoadGraph,
#[error("Invalid namespace: {0}")]
InvalidNamespace(String),
#[error("Failed to create dir {0}")]
FailedToCreateDir(String),
}
#[derive(Enum)]
#[graphql(name = "GraphType")]
pub enum GqlGraphType {
Persistent,
Event,
}
#[derive(ResolvedObject)]
#[graphql(root)]
pub(crate) struct QueryRoot;
#[derive(OneOfInput, Clone, Debug)]
pub enum Template {
Enabled(bool),
Custom(String),
}
fn resolve(template: Option<Template>, default: &str) -> Option<String> {
match template? {
Template::Enabled(false) => None,
Template::Enabled(true) => Some(default.to_owned()),
Template::Custom(template) => Some(template),
}
}
#[ResolvedObjectFields]
impl QueryRoot {
async fn hello() -> &'static str {
"Hello world from raphtory-graphql"
}
async fn graph<'a>(ctx: &Context<'a>, path: &str) -> Result<GqlGraph> {
let data = ctx.data_unchecked::<Data>();
Ok(data
.get_graph(path)
.await
.map(|(g, folder)| GqlGraph::new(folder, g.graph))?)
}
async fn update_graph<'a>(ctx: &Context<'a>, path: String) -> Result<GqlMutableGraph> {
ctx.require_write_access()?;
let data = ctx.data_unchecked::<Data>();
let graph = data
.get_graph(path.as_ref())
.await
.map(|(g, folder)| GqlMutableGraph::new(folder, g))?;
Ok(graph)
}
async fn vectorise_graph<'a>(
ctx: &Context<'a>,
path: String,
model: Option<EmbeddingModel>,
nodes: Option<Template>,
edges: Option<Template>,
) -> Result<bool> {
ctx.require_write_access()?;
let data = ctx.data_unchecked::<Data>();
let template = DocumentTemplate {
node_template: resolve(nodes, DEFAULT_NODE_TEMPLATE),
edge_template: resolve(edges, DEFAULT_EDGE_TEMPLATE),
};
let cached_model = model
.unwrap_or(EmbeddingModel::OpenAI(Default::default()))
.cache(ctx)
.await?;
let folder = ExistingGraphFolder::try_from(data.work_dir.clone(), &path)?;
data.vectorise_folder(&folder, &template, cached_model)
.await?;
Ok(true)
}
async fn vectorised_graph<'a>(ctx: &Context<'a>, path: &str) -> Option<GqlVectorisedGraph> {
let data = ctx.data_unchecked::<Data>();
let g = data.get_graph(path).await.ok()?.0.vectors?;
Some(g.into())
}
async fn namespaces<'a>(ctx: &Context<'a>) -> GqlCollection<Namespace> {
let data = ctx.data_unchecked::<Data>();
let root = Namespace::new(data.work_dir.clone(), data.work_dir.clone());
GqlCollection::new(root.get_all_namespaces().into())
}
async fn namespace<'a>(
ctx: &Context<'a>,
path: String,
) -> Result<Namespace, InvalidPathReason> {
let data = ctx.data_unchecked::<Data>();
let current_dir = valid_path(data.work_dir.clone(), path.as_str(), true)?;
if current_dir.exists() {
Ok(Namespace::new(data.work_dir.clone(), current_dir))
} else {
Err(InvalidPathReason::NamespaceDoesNotExist(path))
}
}
async fn root<'a>(ctx: &Context<'a>) -> Namespace {
let data = ctx.data_unchecked::<Data>();
Namespace::new(data.work_dir.clone(), data.work_dir.clone())
}
async fn plugins<'a>() -> QueryPlugin {
QueryPlugin::default()
}
async fn receive_graph<'a>(ctx: &Context<'a>, path: String) -> Result<String, Arc<GraphError>> {
let path = path.as_ref();
let data = ctx.data_unchecked::<Data>();
let g = data.get_graph(path).await?.0.graph.clone();
let res = url_encode_graph(g)?;
Ok(res)
}
async fn version<'a>(_ctx: &Context<'a>) -> String {
String::from(version())
}
}
#[derive(MutationRoot)]
pub(crate) struct MutRoot;
#[derive(Mutation)]
pub(crate) struct Mut(MutRoot);
#[MutationFields]
impl Mut {
async fn plugins<'a>(_ctx: &Context<'a>) -> MutationPlugin {
MutationPlugin::default()
}
async fn delete_graph<'a>(ctx: &Context<'a>, path: String) -> Result<bool> {
let data = ctx.data_unchecked::<Data>();
data.delete_graph(&path).await?;
Ok(true)
}
async fn new_graph<'a>(
ctx: &Context<'a>,
path: String,
graph_type: GqlGraphType,
) -> Result<bool> {
let data = ctx.data_unchecked::<Data>();
let graph = match graph_type {
GqlGraphType::Persistent => PersistentGraph::new().materialize()?,
GqlGraphType::Event => Graph::new().materialize()?,
};
data.insert_graph(&path, graph).await?;
Ok(true)
}
async fn move_graph<'a>(ctx: &Context<'a>, path: &str, new_path: &str) -> Result<bool> {
Self::copy_graph(ctx, path, new_path).await?;
let data = ctx.data_unchecked::<Data>();
data.delete_graph(path).await?;
Ok(true)
}
async fn copy_graph<'a>(ctx: &Context<'a>, path: &str, new_path: &str) -> Result<bool> {
let data = ctx.data_unchecked::<Data>();
let graph = data.get_graph(path).await?.0.graph;
#[cfg(feature = "storage")]
if let GraphStorage::Disk(_) = graph.core_graph() {
return Err(GqlGraphError::ImmutableDiskGraph.into());
}
data.insert_graph(new_path, graph).await?;
Ok(true)
}
async fn upload_graph<'a>(
ctx: &Context<'a>,
path: String,
graph: Upload,
overwrite: bool,
) -> Result<String> {
let data = ctx.data_unchecked::<Data>();
let graph = {
let in_file = graph.value(ctx)?.content;
let mut archive = ZipArchive::new(in_file)?;
let mut entry = archive.by_name("graph")?;
let mut buf = vec![];
entry.read_to_end(&mut buf)?;
MaterializedGraph::decode_from_bytes(&buf)?
};
if overwrite {
let _ignored = data.delete_graph(&path).await;
}
data.insert_graph(&path, graph).await?;
Ok(path)
}
async fn send_graph<'a>(
ctx: &Context<'a>,
path: &str,
graph: String,
overwrite: bool,
) -> Result<String> {
let data = ctx.data_unchecked::<Data>();
let g: MaterializedGraph = url_decode_graph(graph)?;
if overwrite {
let _ignored = data.delete_graph(path).await;
}
data.insert_graph(path, g).await?;
Ok(path.to_owned())
}
async fn create_subgraph<'a>(
ctx: &Context<'a>,
parent_path: &str,
nodes: Vec<String>,
new_path: String,
overwrite: bool,
) -> Result<String> {
let data = ctx.data_unchecked::<Data>();
let parent_graph = data.get_graph(parent_path).await?.0.graph;
let new_subgraph =
blocking_compute(move || parent_graph.subgraph(nodes).materialize()).await?;
if overwrite {
let _ignored = data.delete_graph(&new_path).await;
}
data.insert_graph(&new_path, new_subgraph).await?;
Ok(new_path)
}
async fn create_index<'a>(
ctx: &Context<'a>,
path: &str,
index_spec: Option<IndexSpecInput>,
in_ram: bool,
) -> Result<bool> {
#[cfg(feature = "search")]
{
let data = ctx.data_unchecked::<Data>();
let graph = data.get_graph(path).await?.0.graph;
match index_spec {
Some(index_spec) => {
let index_spec = index_spec.to_index_spec(graph.clone())?;
if in_ram {
graph.create_index_in_ram_with_spec(index_spec)
} else {
graph.create_index_with_spec(index_spec)
}
}
None => {
if in_ram {
graph.create_index_in_ram()
} else {
graph.create_index()
}
}
}?;
Ok(true)
}
#[cfg(not(feature = "search"))]
{
Err(GraphError::IndexingNotSupported.into())
}
}
}
#[derive(App)]
pub struct App(QueryRoot, MutRoot, Mut);