async_graphql/
subscription.rs1use std::{borrow::Cow, pin::Pin};
2
3use futures_util::stream::{Stream, StreamExt};
4
5use crate::{
6 parser::types::Selection, registry, registry::Registry, Context, ContextSelectionSet,
7 PathSegment, Response, ServerError, ServerResult,
8};
9
10pub trait SubscriptionType: Send + Sync {
12 fn type_name() -> Cow<'static, str>;
14
15 fn qualified_type_name() -> String {
17 format!("{}!", Self::type_name())
18 }
19
20 fn create_type_info(registry: &mut registry::Registry) -> String;
22
23 #[doc(hidden)]
25 fn is_empty() -> bool {
26 false
27 }
28
29 #[doc(hidden)]
30 fn create_field_stream<'a>(
31 &'a self,
32 ctx: &'a Context<'_>,
33 ) -> Option<Pin<Box<dyn Stream<Item = Response> + Send + 'a>>>;
34}
35
36pub(crate) type BoxFieldStream<'a> = Pin<Box<dyn Stream<Item = Response> + 'a + Send>>;
37
38pub(crate) fn collect_subscription_streams<'a, T: SubscriptionType + 'static>(
39 ctx: &ContextSelectionSet<'a>,
40 root: &'a T,
41 streams: &mut Vec<BoxFieldStream<'a>>,
42) -> ServerResult<()> {
43 for selection in &ctx.item.node.items {
44 if let Selection::Field(field) = &selection.node {
45 streams.push(Box::pin({
46 let ctx = ctx.clone();
47 async_stream::stream! {
48 let ctx = ctx.with_field(field);
49 let field_name = ctx.item.node.response_key().node.clone();
50 let stream = root.create_field_stream(&ctx);
51 if let Some(mut stream) = stream {
52 while let Some(resp) = stream.next().await {
53 yield resp;
54 }
55 } else {
56 let err = ServerError::new(format!(r#"Cannot query field "{}" on type "{}"."#, field_name, T::type_name()), Some(ctx.item.pos))
57 .with_path(vec![PathSegment::Field(field_name.to_string())]);
58 yield Response::from_errors(vec![err]);
59 }
60 }
61 }))
62 }
63 }
64 Ok(())
65}
66
67impl<T: SubscriptionType> SubscriptionType for &T {
68 fn type_name() -> Cow<'static, str> {
69 T::type_name()
70 }
71
72 fn create_type_info(registry: &mut Registry) -> String {
73 T::create_type_info(registry)
74 }
75
76 fn create_field_stream<'a>(
77 &'a self,
78 ctx: &'a Context<'_>,
79 ) -> Option<Pin<Box<dyn Stream<Item = Response> + Send + 'a>>> {
80 T::create_field_stream(*self, ctx)
81 }
82}