Skip to main content

graphql_ws_client/client/
subscription.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5
6use futures_lite::{Stream, StreamExt, future, stream};
7
8use crate::{
9    Error, SubscriptionId, client::production_future::read_from_producer, graphql::GraphqlOperation,
10};
11
12use super::ConnectionCommand;
13
14/// A `futures::Stream` for a subscription.
15///
16/// Emits an item for each message received by the subscription.
17#[pin_project::pin_project(PinnedDrop)]
18pub struct Subscription<Operation>
19where
20    Operation: GraphqlOperation,
21{
22    pub(in crate::client) id: SubscriptionId,
23    pub(in crate::client) stream: Option<stream::Boxed<Result<Operation::Response, Error>>>,
24    pub(in crate::client) actor: async_channel::Sender<ConnectionCommand>,
25    pub(in crate::client) drop_sender: Option<async_channel::Sender<SubscriptionId>>,
26}
27
28#[pin_project::pinned_drop]
29impl<Operation> PinnedDrop for Subscription<Operation>
30where
31    Operation: GraphqlOperation,
32{
33    fn drop(mut self: Pin<&mut Self>) {
34        let Some(drop_sender) = self.drop_sender.take() else {
35            return;
36        };
37        // We try_send here but the drop_sender channel _should_ be unbounded so
38        // this should always work if the connection actor is still alive.
39        drop_sender.try_send(self.id).ok();
40    }
41}
42
43impl<Operation> Subscription<Operation>
44where
45    Operation: GraphqlOperation + Send,
46{
47    /// Returns the identifier for this subscription.
48    ///
49    /// This can be used with [`crate::Client::stop`] to stop
50    /// a running subscription without needing access to the `Subscription`
51    /// itself.
52    pub fn id(&self) -> SubscriptionId {
53        self.id
54    }
55
56    /// Stops this subscription
57    pub fn stop(mut self) {
58        let Some(drop_sender) = self.drop_sender.take() else {
59            return;
60        };
61
62        // We try_send here but the drop_sender channel _should_ be unbounded so
63        // this should always work if the connection actor is still alive.
64        drop_sender.try_send(self.id).ok();
65    }
66
67    pub(super) fn join(mut self, future: future::Boxed<()>) -> Self
68    where
69        Operation::Response: 'static,
70    {
71        self.stream = self
72            .stream
73            .take()
74            .map(|stream| join_stream(stream, future).boxed());
75        self
76    }
77}
78
79impl<Operation> Stream for Subscription<Operation>
80where
81    Operation: GraphqlOperation + Unpin,
82{
83    type Item = Result<Operation::Response, Error>;
84
85    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
86        match self.project().stream.as_mut() {
87            None => Poll::Ready(None),
88            Some(stream) => stream.poll_next(cx),
89        }
90    }
91}
92
93/// Joins a future onto the execution of a stream returning a stream that also polls
94/// the given future.
95///
96/// If the future ends the stream will still continue till completion but if the stream
97/// ends the future will be cancelled.
98///
99/// This can be used when you have the receivng side of a channel and a future that sends
100/// on that channel - combining the two into a single stream that'll run till the channel
101/// is exhausted.  If you drop the stream you also cancel the underlying process.
102fn join_stream<Item>(
103    stream: stream::Boxed<Item>,
104    future: future::Boxed<()>,
105) -> impl Stream<Item = Item> {
106    stream::unfold(ProducerState::Running(stream, future), producer_handler)
107}
108
109enum ProducerState<'a, Item> {
110    Running(
111        Pin<Box<dyn Stream<Item = Item> + Send + 'a>>,
112        future::Boxed<()>,
113    ),
114    Draining(Pin<Box<dyn Stream<Item = Item> + Send + 'a>>),
115}
116
117async fn producer_handler<Item>(
118    mut state: ProducerState<'_, Item>,
119) -> Option<(Item, ProducerState<'_, Item>)> {
120    loop {
121        match state {
122            ProducerState::Running(mut stream, producer) => {
123                match read_from_producer(stream.next(), producer).await {
124                    Some((item, producer)) => {
125                        return Some((item?, ProducerState::Running(stream, producer)));
126                    }
127                    None => state = ProducerState::Draining(stream),
128                }
129            }
130            ProducerState::Draining(mut stream) => {
131                return Some((stream.next().await?, ProducerState::Draining(stream)));
132            }
133        }
134    }
135}