async_graphql/http/
multipart_subscribe.rs1use std::time::Duration;
2
3use bytes::{BufMut, Bytes, BytesMut};
4use futures_util::{FutureExt, Stream, StreamExt, stream::BoxStream};
5use mime::Mime;
6
7use crate::{Response, util::Delay};
8
9static PART_HEADER: Bytes =
10 Bytes::from_static(b"--graphql\r\nContent-Type: application/json\r\n\r\n");
11static EOF: Bytes = Bytes::from_static(b"--graphql--\r\n");
12static CRLF: Bytes = Bytes::from_static(b"\r\n");
13static HEARTBEAT: Bytes = Bytes::from_static(b"{}\r\n");
14
15pub fn create_multipart_mixed_stream<'a>(
19 input: impl Stream<Item = Response> + Send + Unpin + 'a,
20 heartbeat_interval: Duration,
21) -> BoxStream<'a, Bytes> {
22 let mut input = input.fuse();
23 let mut heartbeat_timer = Delay::new(heartbeat_interval).fuse();
24
25 asynk_strim::stream_fn(move |mut yielder| async move {
26 loop {
27 futures_util::select! {
28 item = input.next() => {
29 match item {
30 Some(resp) => {
31 let data = BytesMut::new();
32 let mut writer = data.writer();
33 if serde_json::to_writer(&mut writer, &resp).is_err() {
34 continue;
35 }
36
37 yielder.yield_item(PART_HEADER.clone()).await;
38 yielder.yield_item(writer.into_inner().freeze()).await;
39 yielder.yield_item(CRLF.clone()).await;
40 }
41 None => break,
42 }
43 }
44 _ = heartbeat_timer => {
45 heartbeat_timer = Delay::new(heartbeat_interval).fuse();
46 yielder.yield_item(PART_HEADER.clone()).await;
47 yielder.yield_item(HEARTBEAT.clone()).await;
48 }
49 }
50 }
51
52 yielder.yield_item(EOF.clone()).await;
53 })
54 .boxed()
55}
56
57fn parse_accept(accept: &str) -> Vec<Mime> {
58 let mut items = accept
59 .split(',')
60 .map(str::trim)
61 .filter_map(|item| {
62 let mime: Mime = item.parse().ok()?;
63 let q = mime
64 .get_param("q")
65 .and_then(|value| Some((value.as_str().parse::<f32>().ok()? * 1000.0) as i32))
66 .unwrap_or(1000);
67 Some((mime, q))
68 })
69 .collect::<Vec<_>>();
70 items.sort_by(|(_, qa), (_, qb)| qb.cmp(qa));
71 items.into_iter().map(|(mime, _)| mime).collect()
72}
73
74pub fn is_accept_multipart_mixed(accept: &str) -> bool {
87 for mime in parse_accept(accept) {
88 if mime.type_() == mime::APPLICATION && mime.subtype() == mime::JSON {
89 return false;
90 }
91
92 if mime.type_() == mime::MULTIPART
93 && mime.subtype() == "mixed"
94 && mime.get_param(mime::BOUNDARY).map(|value| value.as_str()) == Some("graphql")
95 && mime
96 .get_param("subscriptionSpec")
97 .map(|value| value.as_str())
98 == Some("1.0")
99 {
100 return true;
101 }
102 }
103
104 false
105}