watermelon 0.4.4

High level actor based implementation NATS Core and NATS Jetstream client implementation
Documentation
use std::{
    collections::VecDeque,
    future::Future,
    pin::Pin,
    task::{Context, Poll},
};

use futures_core::{FusedStream, Stream};
use serde::Deserialize;
use serde_json::json;
use watermelon_proto::Subject;

use crate::{
    client::{self, JetstreamClient, jetstream::JetstreamError},
    util::BoxFuture,
};

/// A request to list streams
///
/// Obtained from [`JetstreamClient::streams`].
#[must_use = "streams do nothing unless polled"]
pub struct Streams {
    client: JetstreamClient,
    offset: u32,
    fetch: Option<BoxFuture<'static, Result<StreamsResponse, JetstreamError>>>,
    buffer: VecDeque<client::Stream>,
    exhausted: bool,
}

#[derive(Debug, Deserialize)]
struct StreamsResponse {
    limit: u32,
    streams: VecDeque<client::Stream>,
}

impl Streams {
    pub(crate) fn new(client: JetstreamClient) -> Self {
        Self {
            client,
            offset: 0,
            fetch: None,
            buffer: VecDeque::new(),
            exhausted: false,
        }
    }
}

impl Stream for Streams {
    type Item = Result<client::Stream, JetstreamError>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = self.get_mut();

        if let Some(stream) = this.buffer.pop_front() {
            return Poll::Ready(Some(Ok(stream)));
        }

        if this.exhausted {
            return Poll::Ready(None);
        }

        let fetch = this.fetch.get_or_insert_with(|| {
            let client = this.client.clone();
            let offset = this.offset;

            Box::pin(async move {
                let response_fut = client
                    .client()
                    .request(client.subject_for_request(&Subject::from_static("STREAM.LIST")))
                    .response_timeout(client.request_timeout)
                    .payload(
                        serde_json::to_vec(&json!({
                            "offset": offset,
                        }))
                        .unwrap()
                        .into(),
                    )
                    .await
                    .map_err(JetstreamError::ClientClosed)?;
                let response = response_fut.await.map_err(JetstreamError::ResponseError)?;
                let payload =
                    serde_json::from_slice(&response.base.payload).map_err(JetstreamError::Json)?;
                Ok(payload)
            })
        });

        match Pin::new(fetch).poll(cx) {
            Poll::Pending => Poll::Pending,
            Poll::Ready(Ok(response)) => {
                this.fetch = None;
                this.buffer = response.streams;
                if this.buffer.len() < response.limit as usize {
                    this.exhausted = true;
                } else if !this.buffer.is_empty() {
                    this.offset += 1;
                }

                cx.waker().wake_by_ref();
                Poll::Pending
            }
            Poll::Ready(Err(err)) => {
                this.fetch = None;
                Poll::Ready(Some(Err(err)))
            }
        }
    }
}

impl FusedStream for Streams {
    fn is_terminated(&self) -> bool {
        self.buffer.is_empty() && self.exhausted
    }
}