async_graphql/
subscription.rs1use std::{borrow::Cow, pin::Pin};
2
3use futures_util::stream::{Stream, StreamExt};
4
5use crate::{
6 Context, ContextSelectionSet, PathSegment, Response, ServerError, ServerResult,
7 parser::types::Selection, registry, registry::Registry,
8};
9
10pub trait SubscriptionType: Send + Sync {
12 fn type_name() -> Cow<'static, str>;
14
15 fn qualified_type_name() -> String {
17 format!("{}!", Self::type_name())
18 }
19
20 fn create_type_info(registry: &mut registry::Registry) -> String;
22
23 #[doc(hidden)]
25 fn is_empty() -> bool {
26 false
27 }
28
29 #[doc(hidden)]
30 fn create_field_stream<'a>(
31 &'a self,
32 ctx: &'a Context<'_>,
33 ) -> Option<Pin<Box<dyn Stream<Item = Response> + Send + 'a>>>;
34}
35
36pub(crate) type BoxFieldStream<'a> = Pin<Box<dyn Stream<Item = Response> + 'a + Send>>;
37
38pub(crate) fn collect_subscription_streams<'a, T: SubscriptionType + 'static>(
39 ctx: &ContextSelectionSet<'a>,
40 root: &'a T,
41 streams: &mut Vec<BoxFieldStream<'a>>,
42) -> ServerResult<()> {
43 for selection in &ctx.item.node.items {
44 if let Selection::Field(field) = &selection.node {
45 streams.push(Box::pin({
46 let ctx = ctx.clone();
47 asynk_strim::stream_fn(move |mut yielder| async move {
48 let ctx = ctx.with_field(field);
49 let field_name = ctx.item.node.response_key().node.clone();
50 let stream = root.create_field_stream(&ctx);
51 if let Some(mut stream) = stream {
52 while let Some(resp) = stream.next().await {
53 yielder.yield_item(resp).await;
54 }
55 } else {
56 let err = ServerError::new(
57 format!(
58 r#"Cannot query field "{}" on type "{}"."#,
59 field_name,
60 T::type_name()
61 ),
62 Some(ctx.item.pos),
63 )
64 .with_path(vec![PathSegment::Field(field_name.to_string())]);
65 yielder.yield_item(Response::from_errors(vec![err])).await;
66 }
67 })
68 }))
69 }
70 }
71 Ok(())
72}
73
74impl<T: SubscriptionType> SubscriptionType for &T {
75 fn type_name() -> Cow<'static, str> {
76 T::type_name()
77 }
78
79 fn create_type_info(registry: &mut Registry) -> String {
80 T::create_type_info(registry)
81 }
82
83 fn create_field_stream<'a>(
84 &'a self,
85 ctx: &'a Context<'_>,
86 ) -> Option<Pin<Box<dyn Stream<Item = Response> + Send + 'a>>> {
87 T::create_field_stream(*self, ctx)
88 }
89}