use futures::{future, stream};
use serde::Serialize;
use crate::{
http::GraphQLRequest,
parser::Spanning,
types::base::{is_excluded, merge_key_into, GraphQLType, GraphQLValue},
Arguments, BoxFuture, DefaultScalarValue, ExecutionError, Executor, FieldError, Object,
ScalarValue, Selection, Value, ValuesStream,
};
#[derive(Debug, Serialize)]
pub struct ExecutionOutput<S> {
pub data: Value<S>,
#[serde(bound(serialize = "S: ScalarValue"))]
pub errors: Vec<ExecutionError<S>>,
}
impl<S> ExecutionOutput<S> {
pub fn from_data(data: Value<S>) -> Self {
Self {
data,
errors: vec![],
}
}
}
pub trait SubscriptionCoordinator<'a, CtxT, S>
where
S: ScalarValue,
{
type Connection: SubscriptionConnection<S>;
type Error;
fn subscribe(
&'a self,
_: &'a GraphQLRequest<S>,
_: &'a CtxT,
) -> BoxFuture<'a, Result<Self::Connection, Self::Error>>;
}
pub trait SubscriptionConnection<S>: futures::Stream<Item = ExecutionOutput<S>> {}
pub trait GraphQLSubscriptionValue<S = DefaultScalarValue>: GraphQLValue<S> + Sync
where
Self::TypeInfo: Sync,
Self::Context: Sync,
S: ScalarValue + Send + Sync,
{
fn resolve_into_stream<'s, 'i, 'ref_e, 'e, 'res, 'f>(
&'s self,
info: &'i Self::TypeInfo,
executor: &'ref_e Executor<'ref_e, 'e, Self::Context, S>,
) -> BoxFuture<'f, Result<Value<ValuesStream<'res, S>>, FieldError<S>>>
where
'e: 'res,
'i: 'res,
's: 'f,
'ref_e: 'f,
'res: 'f,
{
if executor.current_selection_set().is_some() {
Box::pin(
async move { Ok(resolve_selection_set_into_stream(self, info, executor).await) },
)
} else {
panic!("resolve_into_stream() must be implemented");
}
}
fn resolve_field_into_stream<'s, 'i, 'ft, 'args, 'e, 'ref_e, 'res, 'f>(
&'s self,
_: &'i Self::TypeInfo,
_: &'ft str,
_: Arguments<'args, S>,
_: &'ref_e Executor<'ref_e, 'e, Self::Context, S>,
) -> BoxFuture<'f, Result<Value<ValuesStream<'res, S>>, FieldError<S>>>
where
's: 'f,
'i: 'res,
'ft: 'f,
'args: 'f,
'ref_e: 'f,
'res: 'f,
'e: 'res,
{
panic!("resolve_field_into_stream must be implemented");
}
fn resolve_into_type_stream<'s, 'i, 'tn, 'e, 'ref_e, 'res, 'f>(
&'s self,
info: &'i Self::TypeInfo,
type_name: &'tn str,
executor: &'ref_e Executor<'ref_e, 'e, Self::Context, S>,
) -> BoxFuture<'f, Result<Value<ValuesStream<'res, S>>, FieldError<S>>>
where
'i: 'res,
'e: 'res,
's: 'f,
'tn: 'f,
'ref_e: 'f,
'res: 'f,
{
Box::pin(async move {
if self.type_name(info) == Some(type_name) {
self.resolve_into_stream(info, executor).await
} else {
panic!("resolve_into_type_stream must be implemented");
}
})
}
}
crate::sa::assert_obj_safe!(GraphQLSubscriptionValue<Context = (), TypeInfo = ()>);
pub trait GraphQLSubscriptionType<S = DefaultScalarValue>:
GraphQLSubscriptionValue<S> + GraphQLType<S>
where
Self::Context: Sync,
Self::TypeInfo: Sync,
S: ScalarValue + Send + Sync,
{
}
impl<S, T> GraphQLSubscriptionType<S> for T
where
T: GraphQLSubscriptionValue<S> + GraphQLType<S> + ?Sized,
T::Context: Sync,
T::TypeInfo: Sync,
S: ScalarValue + Send + Sync,
{
}
pub(crate) fn resolve_selection_set_into_stream<'i, 'inf, 'ref_e, 'e, 'res, 'fut, T, S>(
instance: &'i T,
info: &'inf T::TypeInfo,
executor: &'ref_e Executor<'ref_e, 'e, T::Context, S>,
) -> BoxFuture<'fut, Value<ValuesStream<'res, S>>>
where
'inf: 'res,
'e: 'res,
'i: 'fut,
'e: 'fut,
'ref_e: 'fut,
'res: 'fut,
T: GraphQLSubscriptionValue<S> + ?Sized,
T::TypeInfo: Sync,
T::Context: Sync,
S: ScalarValue + Send + Sync,
{
Box::pin(resolve_selection_set_into_stream_recursive(
instance, info, executor,
))
}
async fn resolve_selection_set_into_stream_recursive<'i, 'inf, 'ref_e, 'e, 'res, T, S>(
instance: &'i T,
info: &'inf T::TypeInfo,
executor: &'ref_e Executor<'ref_e, 'e, T::Context, S>,
) -> Value<ValuesStream<'res, S>>
where
T: GraphQLSubscriptionValue<S> + ?Sized,
T::TypeInfo: Sync,
T::Context: Sync,
S: ScalarValue + Send + Sync,
'inf: 'res,
'e: 'res,
{
let selection_set = executor
.current_selection_set()
.expect("Executor's selection set is none");
let mut object: Object<ValuesStream<'res, S>> = Object::with_capacity(selection_set.len());
let meta_type = executor
.schema()
.concrete_type_by_name(
instance
.type_name(info)
.expect("Resolving named type's selection set")
.as_ref(),
)
.expect("Type not found in schema");
for selection in selection_set {
match selection {
Selection::Field(Spanning {
item: ref f,
start: ref start_pos,
..
}) => {
if is_excluded(&f.directives, &executor.variables()) {
continue;
}
let response_name = f.alias.as_ref().unwrap_or(&f.name).item;
if f.name.item == "__typename" {
let typename =
Value::scalar(instance.concrete_type_name(executor.context(), info));
object.add_field(
response_name,
Value::Scalar(Box::pin(stream::once(future::ok(typename)))),
);
continue;
}
let meta_field = meta_type
.field_by_name(f.name.item)
.unwrap_or_else(|| {
panic!(
"Field {} not found on type {:?}",
f.name.item,
meta_type.name(),
)
})
.clone();
let exec_vars = executor.variables();
let sub_exec = executor.field_sub_executor(
response_name,
f.name.item,
*start_pos,
f.selection_set.as_ref().map(|x| &x[..]),
);
let args = Arguments::new(
f.arguments.as_ref().map(|m| {
m.item
.iter()
.map(|&(ref k, ref v)| (k.item, v.item.clone().into_const(&exec_vars)))
.collect()
}),
&meta_field.arguments,
);
let is_non_null = meta_field.field_type.is_non_null();
let res = instance
.resolve_field_into_stream(info, f.name.item, args, &sub_exec)
.await;
match res {
Ok(Value::Null) if is_non_null => {
return Value::Null;
}
Ok(v) => merge_key_into(&mut object, response_name, v),
Err(e) => {
sub_exec.push_error_at(e, *start_pos);
if meta_field.field_type.is_non_null() {
return Value::Null;
}
object.add_field(f.name.item, Value::Null);
}
}
}
Selection::FragmentSpread(Spanning {
item: ref spread,
start: ref start_pos,
..
}) => {
if is_excluded(&spread.directives, &executor.variables()) {
continue;
}
let fragment = executor
.fragment_by_name(spread.name.item)
.expect("Fragment could not be found");
let sub_exec = executor.type_sub_executor(
Some(fragment.type_condition.item),
Some(&fragment.selection_set[..]),
);
let obj = instance
.resolve_into_type_stream(info, fragment.type_condition.item, &sub_exec)
.await;
match obj {
Ok(val) => {
match val {
Value::Object(o) => {
for (k, v) in o {
merge_key_into(&mut object, &k, v);
}
}
_ => unreachable!(),
}
}
Err(e) => sub_exec.push_error_at(e, *start_pos),
}
}
Selection::InlineFragment(Spanning {
item: ref fragment,
start: ref start_pos,
..
}) => {
if is_excluded(&fragment.directives, &executor.variables()) {
continue;
}
let sub_exec = executor.type_sub_executor(
fragment.type_condition.as_ref().map(|c| c.item),
Some(&fragment.selection_set[..]),
);
if let Some(ref type_condition) = fragment.type_condition {
let sub_result = instance
.resolve_into_type_stream(info, type_condition.item, &sub_exec)
.await;
if let Ok(Value::Object(obj)) = sub_result {
for (k, v) in obj {
merge_key_into(&mut object, &k, v);
}
} else if let Err(e) = sub_result {
sub_exec.push_error_at(e, *start_pos);
}
} else if let Some(type_name) = meta_type.name() {
let sub_result = instance
.resolve_into_type_stream(info, type_name, &sub_exec)
.await;
if let Ok(Value::Object(obj)) = sub_result {
for (k, v) in obj {
merge_key_into(&mut object, &k, v);
}
} else if let Err(e) = sub_result {
sub_exec.push_error_at(e, *start_pos);
}
} else {
return Value::Null;
}
}
}
}
Value::Object(object)
}