fuel_message_broker/
msg_broker.rs1use std::fmt;
2
3use async_trait::async_trait;
4use futures::Stream;
5
6#[derive(Debug, Clone, Default)]
8pub enum Namespace {
9 Custom(String),
10 #[default]
11 None,
12}
13
14impl fmt::Display for Namespace {
15 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
16 match self {
17 Namespace::Custom(s) => write!(f, "{s}"),
18 Namespace::None => write!(f, "none"),
19 }
20 }
21}
22
23impl Namespace {
24 pub fn subject_name(&self, val: &str) -> String {
25 match self {
26 Namespace::Custom(s) => format!("{s}.{val}"),
27 Namespace::None => val.to_string(),
28 }
29 }
30
31 pub fn queue_name(&self, val: &str) -> String {
32 match self {
33 Namespace::Custom(s) => format!("{s}_{val}"),
34 Namespace::None => val.to_string(),
35 }
36 }
37}
38
39#[derive(Debug, thiserror::Error)]
40pub enum MessageBrokerError {
41 #[error("Failed to connect to broker: {0}")]
42 Connection(String),
43 #[error("Failed to setup broker infrastructure: {0}")]
44 Setup(String),
45 #[error("Failed to publish message: {0}")]
46 Publishing(String),
47 #[error("Failed to receive message: {0}")]
48 Receiving(String),
49 #[error("Failed to acknowledge message: {0}")]
50 Acknowledgment(String),
51 #[error("Failed to subscribe: {0}")]
52 Subscription(String),
53 #[error("Failed to flush: {0}")]
54 Flush(String),
55 #[error(transparent)]
56 Serde(#[from] serde_json::Error),
57 #[error(transparent)]
58 Other(#[from] Box<dyn std::error::Error + Send + Sync>),
59 #[error(transparent)]
60 NatsSubscribe(#[from] async_nats::client::SubscribeError),
61 #[error(transparent)]
62 NatsPublish(
63 #[from] async_nats::error::Error<async_nats::client::PublishErrorKind>,
64 ),
65}
66
67#[async_trait]
68pub trait Message: std::fmt::Debug + Send + Sync {
69 fn payload(&self) -> Vec<u8>;
70 async fn ack(&self) -> Result<(), MessageBrokerError>;
71 fn id(&self) -> String;
72}
73
74pub type MessageBlockStream = Box<
75 dyn Stream<Item = Result<Box<dyn Message>, MessageBrokerError>>
76 + Send
77 + Unpin,
78>;
79
80pub type MessageStream = Box<
81 dyn Stream<Item = Result<bytes::Bytes, MessageBrokerError>> + Send + Unpin,
82>;