graphql_ws_client/next/
stream.rs1use std::{
2 pin::Pin,
3 task::{Context, Poll},
4};
5
6use futures_lite::{future, stream, Stream, StreamExt};
7
8use crate::{graphql::GraphqlOperation, next::production_future::read_from_producer, Error};
9
10use super::ConnectionCommand;
11
12#[pin_project::pin_project]
16pub struct Subscription<Operation>
17where
18 Operation: GraphqlOperation,
19{
20 pub(super) id: usize,
21 pub(super) stream: stream::Boxed<Result<Operation::Response, Error>>,
22 pub(super) actor: async_channel::Sender<ConnectionCommand>,
23}
24
25impl<Operation> Subscription<Operation>
26where
27 Operation: GraphqlOperation + Send,
28{
29 pub async fn stop(self) -> Result<(), Error> {
35 self.actor
36 .send(ConnectionCommand::Cancel(self.id))
37 .await
38 .map_err(|error| Error::Send(error.to_string()))
39 }
40
41 pub(super) fn join(self, future: future::Boxed<()>) -> Self
42 where
43 Operation::Response: 'static,
44 {
45 Self {
46 stream: join_stream(self.stream, future).boxed(),
47 ..self
48 }
49 }
50}
51
52impl<Operation> Stream for Subscription<Operation>
53where
54 Operation: GraphqlOperation + Unpin,
55{
56 type Item = Result<Operation::Response, Error>;
57
58 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
59 self.project().stream.as_mut().poll_next(cx)
60 }
61}
62
63fn join_stream<Item>(
73 stream: stream::Boxed<Item>,
74 future: future::Boxed<()>,
75) -> impl Stream<Item = Item> {
76 stream::unfold(ProducerState::Running(stream, future), producer_handler)
77}
78
79enum ProducerState<'a, Item> {
80 Running(
81 Pin<Box<dyn Stream<Item = Item> + Send + 'a>>,
82 future::Boxed<()>,
83 ),
84 Draining(Pin<Box<dyn Stream<Item = Item> + Send + 'a>>),
85}
86
87async fn producer_handler<Item>(
88 mut state: ProducerState<'_, Item>,
89) -> Option<(Item, ProducerState<'_, Item>)> {
90 loop {
91 match state {
92 ProducerState::Running(mut stream, producer) => {
93 match read_from_producer(stream.next(), producer).await {
94 Some((item, producer)) => {
95 return Some((item?, ProducerState::Running(stream, producer)));
96 }
97 None => state = ProducerState::Draining(stream),
98 }
99 }
100 ProducerState::Draining(mut stream) => {
101 return Some((stream.next().await?, ProducerState::Draining(stream)));
102 }
103 }
104 }
105}