graphql_ws_client/client/
subscription.rs1use std::{
2 pin::Pin,
3 task::{Context, Poll},
4};
5
6use futures_lite::{Stream, StreamExt, future, stream};
7
8use crate::{
9 Error, SubscriptionId, client::production_future::read_from_producer, graphql::GraphqlOperation,
10};
11
12use super::ConnectionCommand;
13
14#[pin_project::pin_project(PinnedDrop)]
18pub struct Subscription<Operation>
19where
20 Operation: GraphqlOperation,
21{
22 pub(in crate::client) id: SubscriptionId,
23 pub(in crate::client) stream: Option<stream::Boxed<Result<Operation::Response, Error>>>,
24 pub(in crate::client) actor: async_channel::Sender<ConnectionCommand>,
25 pub(in crate::client) drop_sender: Option<async_channel::Sender<SubscriptionId>>,
26}
27
28#[pin_project::pinned_drop]
29impl<Operation> PinnedDrop for Subscription<Operation>
30where
31 Operation: GraphqlOperation,
32{
33 fn drop(mut self: Pin<&mut Self>) {
34 let Some(drop_sender) = self.drop_sender.take() else {
35 return;
36 };
37 drop_sender.try_send(self.id).ok();
40 }
41}
42
43impl<Operation> Subscription<Operation>
44where
45 Operation: GraphqlOperation + Send,
46{
47 pub fn id(&self) -> SubscriptionId {
53 self.id
54 }
55
56 pub fn stop(mut self) {
58 let Some(drop_sender) = self.drop_sender.take() else {
59 return;
60 };
61
62 drop_sender.try_send(self.id).ok();
65 }
66
67 pub(super) fn join(mut self, future: future::Boxed<()>) -> Self
68 where
69 Operation::Response: 'static,
70 {
71 self.stream = self
72 .stream
73 .take()
74 .map(|stream| join_stream(stream, future).boxed());
75 self
76 }
77}
78
79impl<Operation> Stream for Subscription<Operation>
80where
81 Operation: GraphqlOperation + Unpin,
82{
83 type Item = Result<Operation::Response, Error>;
84
85 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
86 match self.project().stream.as_mut() {
87 None => Poll::Ready(None),
88 Some(stream) => stream.poll_next(cx),
89 }
90 }
91}
92
93fn join_stream<Item>(
103 stream: stream::Boxed<Item>,
104 future: future::Boxed<()>,
105) -> impl Stream<Item = Item> {
106 stream::unfold(ProducerState::Running(stream, future), producer_handler)
107}
108
109enum ProducerState<'a, Item> {
110 Running(
111 Pin<Box<dyn Stream<Item = Item> + Send + 'a>>,
112 future::Boxed<()>,
113 ),
114 Draining(Pin<Box<dyn Stream<Item = Item> + Send + 'a>>),
115}
116
117async fn producer_handler<Item>(
118 mut state: ProducerState<'_, Item>,
119) -> Option<(Item, ProducerState<'_, Item>)> {
120 loop {
121 match state {
122 ProducerState::Running(mut stream, producer) => {
123 match read_from_producer(stream.next(), producer).await {
124 Some((item, producer)) => {
125 return Some((item?, ProducerState::Running(stream, producer)));
126 }
127 None => state = ProducerState::Draining(stream),
128 }
129 }
130 ProducerState::Draining(mut stream) => {
131 return Some((stream.next().await?, ProducerState::Draining(stream)));
132 }
133 }
134 }
135}