use crate::algo::ProjectionBuilder;
use crate::algo::algorithms::Algorithm;
use crate::algo::procedures::{
AlgoContext, AlgoProcedure, AlgoResultRow, ProcedureSignature, ValueType,
};
use anyhow::{Result, anyhow};
use futures::stream::{self, BoxStream, StreamExt};
use serde_json::Value;
use std::marker::PhantomData;
pub trait GraphAlgoAdapter: Send + Sync + 'static {
const NAME: &'static str;
type Algo: Algorithm;
fn specific_args() -> Vec<(&'static str, ValueType, Option<Value>)>;
fn yields() -> Vec<(&'static str, ValueType)>;
fn to_config(args: Vec<Value>) -> <Self::Algo as Algorithm>::Config;
fn map_result(result: <Self::Algo as Algorithm>::Result) -> Result<Vec<AlgoResultRow>>;
fn customize_projection(builder: ProjectionBuilder, _args: &[Value]) -> ProjectionBuilder {
builder.include_reverse(Self::include_reverse())
}
fn include_reverse() -> bool {
true
}
}
pub struct GenericAlgoProcedure<A: GraphAlgoAdapter> {
_marker: PhantomData<A>,
}
impl<A: GraphAlgoAdapter> GenericAlgoProcedure<A> {
pub fn new() -> Self {
Self {
_marker: PhantomData,
}
}
}
impl<A: GraphAlgoAdapter> Default for GenericAlgoProcedure<A> {
fn default() -> Self {
Self::new()
}
}
impl<A: GraphAlgoAdapter> AlgoProcedure for GenericAlgoProcedure<A>
where
<A::Algo as Algorithm>::Result: Send + 'static,
{
fn name(&self) -> &str {
A::NAME
}
fn signature(&self) -> ProcedureSignature {
let mut args = vec![
("nodeLabels", ValueType::List),
("relationshipTypes", ValueType::List),
];
let mut optional_args = Vec::new();
for (name, ty, default) in A::specific_args() {
if let Some(def) = default {
optional_args.push((name, ty, def));
} else {
args.push((name, ty));
}
}
ProcedureSignature {
args,
optional_args,
yields: A::yields(),
}
}
fn execute(
&self,
ctx: AlgoContext,
args: Vec<Value>,
) -> BoxStream<'static, Result<AlgoResultRow>> {
let signature = self.signature();
let args = match signature.validate_args(args) {
Ok(a) => a,
Err(e) => return stream::once(async { Err(e) }).boxed(),
};
let node_labels = args[0]
.as_array()
.unwrap()
.iter()
.map(|v| v.as_str().unwrap().to_string())
.collect::<Vec<_>>();
let edge_types = args[1]
.as_array()
.unwrap()
.iter()
.map(|v| v.as_str().unwrap().to_string())
.collect::<Vec<_>>();
let specific_args = args[2..].to_vec();
let stream = async_stream::try_stream! {
let schema = ctx.storage.schema_manager().schema();
for label in &node_labels {
if !schema.labels.contains_key(label) {
Err(anyhow!("Label '{}' not found", label))?;
}
}
for etype in &edge_types {
if !schema.edge_types.contains_key(etype) {
Err(anyhow!("Edge type '{}' not found", etype))?;
}
}
let mut builder = ProjectionBuilder::new(ctx.storage.clone())
.l0_manager(ctx.l0_manager.clone())
.node_labels(&node_labels.iter().map(|s| s.as_str()).collect::<Vec<_>>())
.edge_types(&edge_types.iter().map(|s| s.as_str()).collect::<Vec<_>>());
builder = A::customize_projection(builder, &specific_args);
let projection = builder.build().await?;
let config = A::to_config(specific_args);
let result = tokio::task::spawn_blocking(move || {
A::Algo::run(&projection, config)
}).await?;
let rows = A::map_result(result)?;
for row in rows {
yield row;
}
};
Box::pin(stream)
}
}