rabbitmq-stream-client 0.6.0

A Rust client for RabbitMQ Stream
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::Arc;

use futures::Sink;
use futures::SinkExt;
use futures::Stream;
use futures::StreamExt;
use rabbitmq_stream_protocol::{Request, Response};

use tokio::sync::Mutex;

use crate::error::ClientError;

pub fn channel<S, T>(sink: S, stream: T) -> (ChannelSender<S>, ChannelReceiver<T>)
where
    S: Sink<Request>,
    T: Stream<Item = Result<Response, ClientError>>,
{
    let closed = Arc::new(AtomicBool::new(false));
    (
        ChannelSender {
            inner: Arc::new(Mutex::new(sink)),
            closed: closed.clone(),
        },
        ChannelReceiver {
            inner: stream,
            closed,
        },
    )
}

#[derive(Clone)]
pub struct ChannelSender<T>
where
    T: Sink<Request>,
{
    inner: Arc<Mutex<T>>,
    closed: Arc<AtomicBool>,
}

impl<T> ChannelSender<T>
where
    T: Sink<Request>,
{
    pub fn is_closed(&self) -> bool {
        self.closed.load(Relaxed)
    }
}

impl<T> ChannelReceiver<T>
where
    T: Stream<Item = Result<Response, ClientError>>,
{
    pub fn is_closed(&self) -> bool {
        self.closed.load(Relaxed)
    }
}

pub struct ChannelReceiver<T>
where
    T: Stream<Item = Result<Response, ClientError>>,
{
    inner: T,
    closed: Arc<AtomicBool>,
}

impl<T: Sink<Request> + Unpin> ChannelSender<T> {
    pub async fn send(&self, item: Request) -> Result<(), T::Error> {
        let mut channel = self.inner.lock().await;
        channel.send(item).await
    }

    pub async fn close(&self) -> Result<(), T::Error> {
        let mut channel = self.inner.lock().await;
        channel.close().await?;
        self.closed.store(true, Relaxed);

        Ok(())
    }
}

impl<T: Stream<Item = Result<Response, ClientError>> + Unpin> ChannelReceiver<T> {
    pub async fn next(&mut self) -> Option<Result<Response, ClientError>> {
        self.inner.next().await
    }
}