use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use async_nats::Subscriber;
use bytes::Bytes;
use futures::{Stream, StreamExt};
use serde::de::DeserializeOwned;
use crate::error::{DinkError, Result};
pub struct DinkStream<T> {
data_sub: Subscriber,
done_sub: Subscriber,
_error_sub: Option<Subscriber>,
cancel_subject: Option<String>,
nc: async_nats::Client,
done: bool,
_phantom: PhantomData<T>,
}
impl<T: DeserializeOwned> DinkStream<T> {
pub(crate) fn new(
data_sub: Subscriber,
done_sub: Subscriber,
error_sub: Option<Subscriber>,
cancel_subject: Option<String>,
nc: async_nats::Client,
) -> Self {
Self {
data_sub,
done_sub,
_error_sub: error_sub,
cancel_subject,
nc,
done: false,
_phantom: PhantomData,
}
}
pub async fn recv(&mut self) -> Result<Option<T>> {
if self.done {
return Ok(None);
}
tokio::select! {
msg = self.data_sub.next() => {
match msg {
Some(msg) => {
let parsed: T = crate::internal::envelope::parse_service_response(&msg.payload)?;
Ok(Some(parsed))
}
None => {
self.done = true;
Ok(None)
}
}
}
_done = self.done_sub.next() => {
self.done = true;
Ok(None)
}
}
}
pub async fn cancel(&self) -> Result<()> {
if let Some(ref subject) = self.cancel_subject {
self.nc
.publish(subject.clone(), Bytes::new())
.await
.map_err(|e| DinkError::Nats(e.to_string()))?;
}
Ok(())
}
}
impl<T: DeserializeOwned + Unpin> Stream for DinkStream<T> {
type Item = Result<T>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if this.done {
return Poll::Ready(None);
}
match Pin::new(&mut this.done_sub).poll_next(cx) {
Poll::Ready(Some(_)) => {
this.done = true;
return Poll::Ready(None);
}
Poll::Ready(None) => {
this.done = true;
return Poll::Ready(None);
}
Poll::Pending => {}
}
match Pin::new(&mut this.data_sub).poll_next(cx) {
Poll::Ready(Some(msg)) => {
let result = crate::internal::envelope::parse_service_response(&msg.payload);
Poll::Ready(Some(result))
}
Poll::Ready(None) => {
this.done = true;
Poll::Ready(None)
}
Poll::Pending => Poll::Pending,
}
}
}
impl<T> Drop for DinkStream<T> {
fn drop(&mut self) {
if !self.done {
if let Some(ref subject) = self.cancel_subject {
let nc = self.nc.clone();
let subject = subject.clone();
let _ = tokio::spawn(async move {
let _ = nc.publish(subject, Bytes::new()).await;
});
}
}
}
}