1#![warn(
5    missing_debug_implementations,
6    missing_docs,
7    rust_2018_idioms,
8    unreachable_pub
9)]
10
11use std::error::Error;
12use std::fmt::Debug;
13
14use futures_channel::mpsc::{SendError, Sender};
15use futures_core::Stream;
16use futures_sink::Sink;
17use futures_util::{sink::SinkExt, stream::TryStreamExt};
18
19pub trait Message: Debug {
21    fn payload(&self) -> &[u8];
23}
24
25#[derive(Debug)]
27pub struct AckMessage<M> {
28    message: M,
29    acks_tx: Sender<M>,
30}
31
32impl<M> AckMessage<M> {
33    pub fn new(message: M, acks_tx: Sender<M>) -> Self {
35        Self { message, acks_tx }
36    }
37
38    pub fn message(&self) -> &M {
40        &self.message
41    }
42
43    pub async fn ack(mut self) -> Result<(), SendError> {
45        self.acks_tx.send(self.message).await?;
46        Ok(())
47    }
48}
49
50impl<M: Message> Message for AckMessage<M> {
51    fn payload(&self) -> &[u8] {
52        &self.message().payload()
53    }
54}
55
56pub trait Subscriber:
58    Stream<Item = Result<AckMessage<<Self as Subscriber>::Message>, <Self as Subscriber>::Error>>
59    + Unpin
60{
61    type Message;
63
64    type Error: Error + Send + Sync + 'static;
66}
67
68pub trait Publisher:
70    Sink<<Self as Publisher>::Message, Error = <Self as Publisher>::Error> + Unpin
71{
72    type Message;
74
75    type Error: Error + Send + Sync + 'static;
77}
78
79pub trait AckHandler:
81    Stream<Item = Result<<Self as AckHandler>::Output, <Self as AckHandler>::Error>> + Unpin
82{
83    type Output;
85
86    type Error: Error + Send + Sync + 'static;
88}
89
90pub trait Statuser {
92    type Error: Error + Send + Sync + 'static;
94
95    fn status(&self) -> Result<(), Self::Error>;
97}
98
99pub async fn noop_ack_handler<A: AckHandler<Output = ()>>(mut handler: A) -> Result<(), A::Error> {
102    while handler.try_next().await?.is_some() {}
103    Ok(())
104}