use std::{
pin::Pin,
task::{Context, Poll},
};
use futures_lite::{Stream, StreamExt, future, stream};
use crate::{
Error, SubscriptionId, client::production_future::read_from_producer, graphql::GraphqlOperation,
};
use super::ConnectionCommand;
#[pin_project::pin_project(PinnedDrop)]
pub struct Subscription<Operation>
where
Operation: GraphqlOperation,
{
pub(in crate::client) id: SubscriptionId,
pub(in crate::client) stream: Option<stream::Boxed<Result<Operation::Response, Error>>>,
pub(in crate::client) actor: async_channel::Sender<ConnectionCommand>,
pub(in crate::client) drop_sender: Option<async_channel::Sender<SubscriptionId>>,
}
#[pin_project::pinned_drop]
impl<Operation> PinnedDrop for Subscription<Operation>
where
Operation: GraphqlOperation,
{
fn drop(mut self: Pin<&mut Self>) {
let Some(drop_sender) = self.drop_sender.take() else {
return;
};
drop_sender.try_send(self.id).ok();
}
}
impl<Operation> Subscription<Operation>
where
Operation: GraphqlOperation + Send,
{
pub fn id(&self) -> SubscriptionId {
self.id
}
pub fn stop(mut self) {
let Some(drop_sender) = self.drop_sender.take() else {
return;
};
drop_sender.try_send(self.id).ok();
}
pub(super) fn join(mut self, future: future::Boxed<()>) -> Self
where
Operation::Response: 'static,
{
self.stream = self
.stream
.take()
.map(|stream| join_stream(stream, future).boxed());
self
}
}
impl<Operation> Stream for Subscription<Operation>
where
Operation: GraphqlOperation + Unpin,
{
type Item = Result<Operation::Response, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.project().stream.as_mut() {
None => Poll::Ready(None),
Some(stream) => stream.poll_next(cx),
}
}
}
fn join_stream<Item>(
stream: stream::Boxed<Item>,
future: future::Boxed<()>,
) -> impl Stream<Item = Item> {
stream::unfold(ProducerState::Running(stream, future), producer_handler)
}
enum ProducerState<'a, Item> {
Running(
Pin<Box<dyn Stream<Item = Item> + Send + 'a>>,
future::Boxed<()>,
),
Draining(Pin<Box<dyn Stream<Item = Item> + Send + 'a>>),
}
async fn producer_handler<Item>(
mut state: ProducerState<'_, Item>,
) -> Option<(Item, ProducerState<'_, Item>)> {
loop {
match state {
ProducerState::Running(mut stream, producer) => {
match read_from_producer(stream.next(), producer).await {
Some((item, producer)) => {
return Some((item?, ProducerState::Running(stream, producer)));
}
None => state = ProducerState::Draining(stream),
}
}
ProducerState::Draining(mut stream) => {
return Some((stream.next().await?, ProducerState::Draining(stream)));
}
}
}
}