use core::pin::Pin;
use async_trait::async_trait;
use futures::{
task::{Context, Poll},
Stream,
};
use pin_project::pin_project;
use crate::{
client::sync::{ChannelRx, ChannelTx},
event::Event,
prelude::*,
query::Query,
Error,
};
#[async_trait]
pub trait SubscriptionClient {
async fn subscribe(&self, query: Query) -> Result<Subscription, Error>;
async fn unsubscribe(&self, query: Query) -> Result<(), Error>;
fn close(self) -> Result<(), Error>;
}
pub(crate) type SubscriptionTx = ChannelTx<Result<Event, Error>>;
pub(crate) type SubscriptionRx = ChannelRx<Result<Event, Error>>;
#[pin_project]
#[derive(Debug)]
pub struct Subscription {
id: String,
query: Query,
#[pin]
rx: SubscriptionRx,
}
impl Stream for Subscription {
type Item = Result<Event, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().rx.poll_next(cx)
}
}
impl Subscription {
pub(crate) fn new(id: String, query: Query, rx: SubscriptionRx) -> Self {
Self { id, query, rx }
}
pub fn id(&self) -> &str {
&self.id
}
pub fn query(&self) -> &Query {
&self.query
}
}