janus/
lib.rs

1//! # Janus
2//!
3//! Janus is a thin abstraction for synchronous/asynchronous messages publishing and consumption.
4#![warn(
5    missing_debug_implementations,
6    missing_docs,
7    rust_2018_idioms,
8    unreachable_pub
9)]
10
11use std::error::Error;
12use std::fmt::Debug;
13
14use futures_channel::mpsc::{SendError, Sender};
15use futures_core::Stream;
16use futures_sink::Sink;
17use futures_util::{sink::SinkExt, stream::TryStreamExt};
18
19/// Provides a generic abstraction over a message.
20pub trait Message: Debug {
21    /// Returns the payload of the message.
22    fn payload(&self) -> &[u8];
23}
24
25/// Wraps a message with a channel for sending acknowledgments.
26#[derive(Debug)]
27pub struct AckMessage<M> {
28    message: M,
29    acks_tx: Sender<M>,
30}
31
32impl<M> AckMessage<M> {
33    /// Creates a new acknowledged message.
34    pub fn new(message: M, acks_tx: Sender<M>) -> Self {
35        Self { message, acks_tx }
36    }
37
38    /// Returns a reference to the underlying message.
39    pub fn message(&self) -> &M {
40        &self.message
41    }
42
43    /// Sends the message to the `AckHandler`.
44    pub async fn ack(mut self) -> Result<(), SendError> {
45        self.acks_tx.send(self.message).await?;
46        Ok(())
47    }
48}
49
50impl<M: Message> Message for AckMessage<M> {
51    fn payload(&self) -> &[u8] {
52        &self.message().payload()
53    }
54}
55
56/// Produces a stream of `Message`s.
57pub trait Subscriber:
58    Stream<Item = Result<AckMessage<<Self as Subscriber>::Message>, <Self as Subscriber>::Error>>
59    + Unpin
60{
61    /// The type of `Message` that the subscriber will produce when successful.
62    type Message;
63
64    /// The type of `Error` that the subscriber will produce when it fails.
65    type Error: Error + Send + Sync + 'static;
66}
67
68/// Publishes `Message`s via a sink.
69pub trait Publisher:
70    Sink<<Self as Publisher>::Message, Error = <Self as Publisher>::Error> + Unpin
71{
72    /// The type of `Message` that the publisher will produce when successful.
73    type Message;
74
75    /// The type of `Error` that the publisher will produce when it fails.
76    type Error: Error + Send + Sync + 'static;
77}
78
79/// Produces a stream of acknowledgments from an associated `Publisher` or `Subscriber`.
80pub trait AckHandler:
81    Stream<Item = Result<<Self as AckHandler>::Output, <Self as AckHandler>::Error>> + Unpin
82{
83    /// The type of output that the acknowledhment handler will produce when it fails.
84    type Output;
85
86    /// The type of `Error` that the acknowledgment handler will produce when it fails.
87    type Error: Error + Send + Sync + 'static;
88}
89
90/// Checks the status of an adapter.
91pub trait Statuser {
92    /// The type of `Error` that the acknowledgment handler will produce when it fails.
93    type Error: Error + Send + Sync + 'static;
94
95    /// Determines the status of the adapter.
96    fn status(&self) -> Result<(), Self::Error>;
97}
98
99/// A convenience function to continuously processes acks until an error is
100/// encountered.
101pub async fn noop_ack_handler<A: AckHandler<Output = ()>>(mut handler: A) -> Result<(), A::Error> {
102    while handler.try_next().await?.is_some() {}
103    Ok(())
104}