async_graphql/http/
multipart_subscribe.rs

1use std::time::Duration;
2
3use bytes::{BufMut, Bytes, BytesMut};
4use futures_util::{FutureExt, Stream, StreamExt, stream::BoxStream};
5use mime::Mime;
6
7use crate::{Response, util::Delay};
8
9static PART_HEADER: Bytes =
10    Bytes::from_static(b"--graphql\r\nContent-Type: application/json\r\n\r\n");
11static EOF: Bytes = Bytes::from_static(b"--graphql--\r\n");
12static CRLF: Bytes = Bytes::from_static(b"\r\n");
13static HEARTBEAT: Bytes = Bytes::from_static(b"{}\r\n");
14
15/// Create a stream for `multipart/mixed` responses.
16///
17/// Reference: <https://www.apollographql.com/docs/router/executing-operations/subscription-multipart-protocol/>
18pub fn create_multipart_mixed_stream<'a>(
19    input: impl Stream<Item = Response> + Send + Unpin + 'a,
20    heartbeat_interval: Duration,
21) -> BoxStream<'a, Bytes> {
22    let mut input = input.fuse();
23    let mut heartbeat_timer = Delay::new(heartbeat_interval).fuse();
24
25    asynk_strim::stream_fn(move |mut yielder| async move {
26        loop {
27            futures_util::select! {
28                item = input.next() => {
29                    match item {
30                        Some(resp) => {
31                            let data = BytesMut::new();
32                            let mut writer = data.writer();
33                            if serde_json::to_writer(&mut writer, &resp).is_err() {
34                                continue;
35                            }
36
37                            yielder.yield_item(PART_HEADER.clone()).await;
38                            yielder.yield_item(writer.into_inner().freeze()).await;
39                            yielder.yield_item(CRLF.clone()).await;
40                        }
41                        None => break,
42                    }
43                }
44                _ = heartbeat_timer => {
45                    heartbeat_timer = Delay::new(heartbeat_interval).fuse();
46                    yielder.yield_item(PART_HEADER.clone()).await;
47                    yielder.yield_item(HEARTBEAT.clone()).await;
48                }
49            }
50        }
51
52        yielder.yield_item(EOF.clone()).await;
53    })
54    .boxed()
55}
56
57fn parse_accept(accept: &str) -> Vec<Mime> {
58    let mut items = accept
59        .split(',')
60        .map(str::trim)
61        .filter_map(|item| {
62            let mime: Mime = item.parse().ok()?;
63            let q = mime
64                .get_param("q")
65                .and_then(|value| Some((value.as_str().parse::<f32>().ok()? * 1000.0) as i32))
66                .unwrap_or(1000);
67            Some((mime, q))
68        })
69        .collect::<Vec<_>>();
70    items.sort_by(|(_, qa), (_, qb)| qb.cmp(qa));
71    items.into_iter().map(|(mime, _)| mime).collect()
72}
73
74/// Check accept is multipart-mixed
75///
76/// # Example header
77///
78/// ```text
79/// Accept: multipart/mixed; boundary="graphql"; subscriptionSpec="1.0"
80/// ```
81///
82/// the value for boundary should always be `graphql`, and the value
83/// for `subscriptionSpec` should always be `1.0`.
84///
85/// Reference: <https://www.apollographql.com/docs/router/executing-operations/subscription-multipart-protocol/>
86pub fn is_accept_multipart_mixed(accept: &str) -> bool {
87    for mime in parse_accept(accept) {
88        if mime.type_() == mime::APPLICATION && mime.subtype() == mime::JSON {
89            return false;
90        }
91
92        if mime.type_() == mime::MULTIPART
93            && mime.subtype() == "mixed"
94            && mime.get_param(mime::BOUNDARY).map(|value| value.as_str()) == Some("graphql")
95            && mime
96                .get_param("subscriptionSpec")
97                .map(|value| value.as_str())
98                == Some("1.0")
99        {
100            return true;
101        }
102    }
103
104    false
105}