Skip to main content

fuel_message_broker/
msg_broker.rs

1use std::fmt;
2
3use async_trait::async_trait;
4use futures::Stream;
5
6/// Represents a namespace for message broker subjects/topics
7#[derive(Debug, Clone, Default)]
8pub enum Namespace {
9    Custom(String),
10    #[default]
11    None,
12}
13
14impl fmt::Display for Namespace {
15    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
16        match self {
17            Namespace::Custom(s) => write!(f, "{s}"),
18            Namespace::None => write!(f, "none"),
19        }
20    }
21}
22
23impl Namespace {
24    pub fn subject_name(&self, val: &str) -> String {
25        match self {
26            Namespace::Custom(s) => format!("{s}.{val}"),
27            Namespace::None => val.to_string(),
28        }
29    }
30
31    pub fn queue_name(&self, val: &str) -> String {
32        match self {
33            Namespace::Custom(s) => format!("{s}_{val}"),
34            Namespace::None => val.to_string(),
35        }
36    }
37}
38
39#[derive(Debug, thiserror::Error)]
40pub enum MessageBrokerError {
41    #[error("Failed to connect to broker: {0}")]
42    Connection(String),
43    #[error("Failed to setup broker infrastructure: {0}")]
44    Setup(String),
45    #[error("Failed to publish message: {0}")]
46    Publishing(String),
47    #[error("Failed to receive message: {0}")]
48    Receiving(String),
49    #[error("Failed to acknowledge message: {0}")]
50    Acknowledgment(String),
51    #[error("Failed to subscribe: {0}")]
52    Subscription(String),
53    #[error("Failed to flush: {0}")]
54    Flush(String),
55    #[error(transparent)]
56    Serde(#[from] serde_json::Error),
57    #[error(transparent)]
58    Other(#[from] Box<dyn std::error::Error + Send + Sync>),
59    #[error(transparent)]
60    NatsSubscribe(#[from] async_nats::client::SubscribeError),
61    #[error(transparent)]
62    NatsPublish(
63        #[from] async_nats::error::Error<async_nats::client::PublishErrorKind>,
64    ),
65}
66
67#[async_trait]
68pub trait Message: std::fmt::Debug + Send + Sync {
69    fn payload(&self) -> Vec<u8>;
70    async fn ack(&self) -> Result<(), MessageBrokerError>;
71    fn id(&self) -> String;
72}
73
74pub type MessageBlockStream = Box<
75    dyn Stream<Item = Result<Box<dyn Message>, MessageBrokerError>>
76        + Send
77        + Unpin,
78>;
79
80pub type MessageStream = Box<
81    dyn Stream<Item = Result<bytes::Bytes, MessageBrokerError>> + Send + Unpin,
82>;