use std::{
pin::Pin,
task::{Context, Poll},
};
use futures_lite::{future, stream, Stream, StreamExt};
use crate::{graphql::GraphqlOperation, next::production_future::read_from_producer, Error};
use super::ConnectionCommand;
#[pin_project::pin_project]
pub struct Subscription<Operation>
where
Operation: GraphqlOperation,
{
pub(super) id: usize,
pub(super) stream: stream::Boxed<Result<Operation::Response, Error>>,
pub(super) actor: async_channel::Sender<ConnectionCommand>,
}
impl<Operation> Subscription<Operation>
where
Operation: GraphqlOperation + Send,
{
pub async fn stop(self) -> Result<(), Error> {
self.actor
.send(ConnectionCommand::Cancel(self.id))
.await
.map_err(|error| Error::Send(error.to_string()))
}
pub(super) fn join(self, future: future::Boxed<()>) -> Self
where
Operation::Response: 'static,
{
Self {
stream: join_stream(self.stream, future).boxed(),
..self
}
}
}
impl<Operation> Stream for Subscription<Operation>
where
Operation: GraphqlOperation + Unpin,
{
type Item = Result<Operation::Response, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().stream.as_mut().poll_next(cx)
}
}
fn join_stream<Item>(
stream: stream::Boxed<Item>,
future: future::Boxed<()>,
) -> impl Stream<Item = Item> {
stream::unfold(ProducerState::Running(stream, future), producer_handler)
}
enum ProducerState<'a, Item> {
Running(
Pin<Box<dyn Stream<Item = Item> + Send + 'a>>,
future::Boxed<()>,
),
Draining(Pin<Box<dyn Stream<Item = Item> + Send + 'a>>),
}
async fn producer_handler<Item>(
mut state: ProducerState<'_, Item>,
) -> Option<(Item, ProducerState<'_, Item>)> {
loop {
match state {
ProducerState::Running(mut stream, producer) => {
match read_from_producer(stream.next(), producer).await {
Some((item, producer)) => {
return Some((item?, ProducerState::Running(stream, producer)));
}
None => state = ProducerState::Draining(stream),
}
}
ProducerState::Draining(mut stream) => {
return Some((stream.next().await?, ProducerState::Draining(stream)));
}
}
}
}