1#![warn(
5 missing_debug_implementations,
6 missing_docs,
7 rust_2018_idioms,
8 unreachable_pub
9)]
10
11use std::error::Error;
12use std::fmt::Debug;
13
14use futures_channel::mpsc::{SendError, Sender};
15use futures_core::Stream;
16use futures_sink::Sink;
17use futures_util::{sink::SinkExt, stream::TryStreamExt};
18
19pub trait Message: Debug {
21 fn payload(&self) -> &[u8];
23}
24
25#[derive(Debug)]
27pub struct AckMessage<M> {
28 message: M,
29 acks_tx: Sender<M>,
30}
31
32impl<M> AckMessage<M> {
33 pub fn new(message: M, acks_tx: Sender<M>) -> Self {
35 Self { message, acks_tx }
36 }
37
38 pub fn message(&self) -> &M {
40 &self.message
41 }
42
43 pub async fn ack(mut self) -> Result<(), SendError> {
45 self.acks_tx.send(self.message).await?;
46 Ok(())
47 }
48}
49
50impl<M: Message> Message for AckMessage<M> {
51 fn payload(&self) -> &[u8] {
52 &self.message().payload()
53 }
54}
55
56pub trait Subscriber:
58 Stream<Item = Result<AckMessage<<Self as Subscriber>::Message>, <Self as Subscriber>::Error>>
59 + Unpin
60{
61 type Message;
63
64 type Error: Error + Send + Sync + 'static;
66}
67
68pub trait Publisher:
70 Sink<<Self as Publisher>::Message, Error = <Self as Publisher>::Error> + Unpin
71{
72 type Message;
74
75 type Error: Error + Send + Sync + 'static;
77}
78
79pub trait AckHandler:
81 Stream<Item = Result<<Self as AckHandler>::Output, <Self as AckHandler>::Error>> + Unpin
82{
83 type Output;
85
86 type Error: Error + Send + Sync + 'static;
88}
89
90pub trait Statuser {
92 type Error: Error + Send + Sync + 'static;
94
95 fn status(&self) -> Result<(), Self::Error>;
97}
98
99pub async fn noop_ack_handler<A: AckHandler<Output = ()>>(mut handler: A) -> Result<(), A::Error> {
102 while handler.try_next().await?.is_some() {}
103 Ok(())
104}