graphql_ws_client/next/
stream.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5
6use futures_lite::{future, stream, Stream, StreamExt};
7
8use crate::{graphql::GraphqlOperation, next::production_future::read_from_producer, Error};
9
10use super::ConnectionCommand;
11
12/// A `futures::Stream` for a subscription.
13///
14/// Emits an item for each message received by the subscription.
15#[pin_project::pin_project]
16pub struct Subscription<Operation>
17where
18    Operation: GraphqlOperation,
19{
20    pub(super) id: usize,
21    pub(super) stream: stream::Boxed<Result<Operation::Response, Error>>,
22    pub(super) actor: async_channel::Sender<ConnectionCommand>,
23}
24
25impl<Operation> Subscription<Operation>
26where
27    Operation: GraphqlOperation + Send,
28{
29    /// Stops the subscription by sending a Complete message to the server.
30    ///
31    /// # Errors
32    ///
33    /// Will return `Err` if the stop operation fails.
34    pub async fn stop(self) -> Result<(), Error> {
35        self.actor
36            .send(ConnectionCommand::Cancel(self.id))
37            .await
38            .map_err(|error| Error::Send(error.to_string()))
39    }
40
41    pub(super) fn join(self, future: future::Boxed<()>) -> Self
42    where
43        Operation::Response: 'static,
44    {
45        Self {
46            stream: join_stream(self.stream, future).boxed(),
47            ..self
48        }
49    }
50}
51
52impl<Operation> Stream for Subscription<Operation>
53where
54    Operation: GraphqlOperation + Unpin,
55{
56    type Item = Result<Operation::Response, Error>;
57
58    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
59        self.project().stream.as_mut().poll_next(cx)
60    }
61}
62
63/// Joins a future onto the execution of a stream returning a stream that also polls
64/// the given future.
65///
66/// If the future ends the stream will still continue till completion but if the stream
67/// ends the future will be cancelled.
68///
69/// This can be used when you have the receivng side of a channel and a future that sends
70/// on that channel - combining the two into a single stream that'll run till the channel
71/// is exhausted.  If you drop the stream you also cancel the underlying process.
72fn join_stream<Item>(
73    stream: stream::Boxed<Item>,
74    future: future::Boxed<()>,
75) -> impl Stream<Item = Item> {
76    stream::unfold(ProducerState::Running(stream, future), producer_handler)
77}
78
79enum ProducerState<'a, Item> {
80    Running(
81        Pin<Box<dyn Stream<Item = Item> + Send + 'a>>,
82        future::Boxed<()>,
83    ),
84    Draining(Pin<Box<dyn Stream<Item = Item> + Send + 'a>>),
85}
86
87async fn producer_handler<Item>(
88    mut state: ProducerState<'_, Item>,
89) -> Option<(Item, ProducerState<'_, Item>)> {
90    loop {
91        match state {
92            ProducerState::Running(mut stream, producer) => {
93                match read_from_producer(stream.next(), producer).await {
94                    Some((item, producer)) => {
95                        return Some((item?, ProducerState::Running(stream, producer)));
96                    }
97                    None => state = ProducerState::Draining(stream),
98                }
99            }
100            ProducerState::Draining(mut stream) => {
101                return Some((stream.next().await?, ProducerState::Draining(stream)));
102            }
103        }
104    }
105}