1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
use crate::context::QueryEnv;
use crate::parser::query::{Selection, TypeCondition};
use crate::{Context, ContextSelectionSet, ObjectType, Result, Schema, SchemaEnv, Type};
use futures::{Future, Stream};
use std::pin::Pin;
#[async_trait::async_trait]
pub trait SubscriptionType: Type {
#[doc(hidden)]
fn is_empty() -> bool {
false
}
#[doc(hidden)]
async fn create_field_stream(
&self,
idx: usize,
ctx: &Context<'_>,
schema_env: SchemaEnv,
query_env: QueryEnv,
) -> Result<Pin<Box<dyn Stream<Item = Result<serde_json::Value>> + Send>>>
where
Self: Send + Sync + 'static + Sized;
}
type BoxCreateStreamFuture<'a> = Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
pub fn create_subscription_stream<'a, Query, Mutation, Subscription>(
schema: &'a Schema<Query, Mutation, Subscription>,
environment: QueryEnv,
ctx: &'a ContextSelectionSet<'_>,
streams: &'a mut Vec<Pin<Box<dyn Stream<Item = Result<serde_json::Value>> + Send>>>,
) -> BoxCreateStreamFuture<'a>
where
Query: ObjectType + Send + Sync + 'static,
Mutation: ObjectType + Send + Sync + 'static,
Subscription: SubscriptionType + Send + Sync + 'static + Sized,
{
Box::pin(async move {
for (idx, selection) in ctx.items.iter().enumerate() {
match &selection.node {
Selection::Field(field) => {
if ctx.is_skip(&field.directives)? {
continue;
}
streams.push(
schema
.subscription
.create_field_stream(
idx,
&ctx.with_field(field),
schema.env.clone(),
environment.clone(),
)
.await?,
)
}
Selection::FragmentSpread(fragment_spread) => {
if ctx.is_skip(&fragment_spread.directives)? {
continue;
}
if let Some(fragment) = ctx
.query_env
.document
.fragments()
.get(fragment_spread.fragment_name.as_str())
{
create_subscription_stream(
schema,
environment.clone(),
&ctx.with_selection_set(&fragment.selection_set),
streams,
)
.await?;
}
}
Selection::InlineFragment(inline_fragment) => {
if ctx.is_skip(&inline_fragment.directives)? {
continue;
}
if let Some(TypeCondition::On(name)) =
inline_fragment.type_condition.as_ref().map(|v| &v.node)
{
if name.node == Subscription::type_name() {
create_subscription_stream(
schema,
environment.clone(),
&ctx.with_selection_set(&inline_fragment.selection_set),
streams,
)
.await?;
}
} else {
create_subscription_stream(
schema,
environment.clone(),
&ctx.with_selection_set(&inline_fragment.selection_set),
streams,
)
.await?;
}
}
}
}
Ok(())
})
}