use std::pin::Pin;
use std::task::{Context, Poll};
use futures_util::stream::{FusedStream, Stream, StreamExt};
use tokio::net::TcpStream;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async};
use tungstenite::protocol::Message;
use url::Url;
use super::builder::SubscriberBuilder;
use super::request::get_request_builder;
use crate::auth::Auth;
use crate::error::Error;
use crate::payload::ReceivedPayload;
#[derive(Debug, Clone)]
pub struct Async {
auth: Option<Auth>,
}
impl Async {
#[inline]
pub(crate) fn new(builder: SubscriberBuilder) -> Result<Self, Error> {
Ok(Self { auth: builder.auth })
}
pub(crate) async fn subscribe(&self, url: &Url, topic: &str) -> Result<MessageStream, Error> {
let builder = get_request_builder(url, topic, &self.auth)?;
Ok(MessageStream {
socket: connect_async(builder).await?.0,
})
}
}
pub struct MessageStream {
socket: WebSocketStream<MaybeTlsStream<TcpStream>>,
}
impl Stream for MessageStream {
type Item = Result<ReceivedPayload, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.socket.is_terminated() {
return Poll::Ready(None);
}
let text_message = loop {
let message = match self.socket.poll_next_unpin(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Some(Ok(message))) => message,
Poll::Ready(Some(Err(error))) => return Poll::Ready(Some(Err(Error::from(error)))),
Poll::Ready(None) => return Poll::Ready(None),
};
match message {
Message::Close(_) => return Poll::Ready(None),
Message::Text(text_message) => break text_message,
_ => {}
}
};
match serde_json::from_str(text_message.as_str()) {
Ok(received_message) => Poll::Ready(Some(Ok(received_message))),
Err(error) => Poll::Ready(Some(Err(Error::from(error)))),
}
}
}