ame_bus/
message.rs

1use serde::de::Error;
2use std::sync::Arc;
3#[cfg(feature = "jetstream")]
4use async_nats::jetstream::Context;
5
6#[async_trait::async_trait]
7/// # NATS Message
8///
9/// Generic trait for NATS messages.
10pub trait NatsMessage
11where
12    Self: Sized + Send,
13{
14    /// Error type for serialization.
15    type SerError: std::error::Error + Send + Sync + 'static;
16
17    /// Error type for deserialization.
18    type DeError: std::error::Error + Send + Sync + 'static;
19
20    /// serialize message to bytes. Can be any format.
21    fn to_bytes(&self) -> Result<Arc<[u8]>, Self::SerError>;
22
23    /// parse message from bytes. Can be any format.
24    fn parse_from_bytes(bytes: impl AsRef<[u8]>) -> Result<Self, Self::DeError>;
25}
26
27impl<T: NatsJsonMessage> NatsMessage for T {
28    type SerError = serde_json::Error;
29    type DeError = serde_json::Error;
30
31    fn to_bytes(&self) -> Result<Arc<[u8]>, Self::SerError> {
32        self.to_json_bytes()
33    }
34
35    fn parse_from_bytes(bytes: impl AsRef<[u8]>) -> Result<Self, Self::DeError> {
36        Self::from_json_bytes(bytes)
37    }
38}
39
40/// # NATS JSON Message
41///
42/// Implement this trait will make the struct can be serialized and deserialized to JSON bytes.
43///
44/// Based on `serde_json` serialization and deserialization.
45///
46/// implement `NatsJsonMessage` will automatically implement `NatsMessage` for the type.
47pub trait NatsJsonMessage
48where
49    Self: serde::Serialize + for<'de> serde::de::Deserialize<'de> + Sized + Send,
50{
51    #[allow(missing_docs)]
52    fn to_json_bytes(&self) -> Result<Arc<[u8]>, serde_json::Error> {
53        let json = serde_json::to_string(self)?;
54        Ok(Arc::from(json.into_bytes()))
55    }
56    #[allow(missing_docs)]
57    fn from_json_bytes(bytes: impl AsRef<[u8]>) -> Result<Self, serde_json::Error> {
58        let json = std::str::from_utf8(bytes.as_ref())
59            .map_err(|_| serde_json::Error::custom("Failed to parse string from bytes"))?;
60        let rpc_request = serde_json::from_str(json)?;
61        Ok(rpc_request)
62    }
63}
64
65impl NatsJsonMessage for () {}
66impl NatsJsonMessage for serde_json::Value {}
67
68#[async_trait::async_trait]
69/// Message in NATS core (including service RPC).
70pub trait NatsCoreMessageSendTrait: NatsMessage + DynamicSubjectNatsMessage {
71
72    #[doc(hidden)]
73    /// Publish the message to the NATS server.
74    ///
75    /// **DO NOT OVERRIDE THIS FUNCTION.**
76    async fn publish(&self, nats: &async_nats::Client) -> anyhow::Result<()> {
77        let subject = self.subject();
78        let bytes = self.to_bytes()?;
79        nats.publish(subject, bytes.to_vec().into()).await?;
80        Ok(())
81    }
82}
83
84#[cfg(feature = "jetstream")]
85#[async_trait::async_trait]
86/// Message in NATS JetStream that can be published.
87pub trait JetStreamMessageSendTrait: NatsMessage + DynamicSubjectNatsMessage {
88    
89    #[doc(hidden)]
90    /// Publish the message to the NATS server.
91    /// 
92    /// **DO NOT OVERRIDE THIS FUNCTION.**
93    async fn publish(&self, js_context: &Context) -> anyhow::Result<()> {
94        js_context.publish(self.subject(), self.to_bytes()?.to_vec().into()).await?;
95        Ok(())
96    }
97}
98
99/// NATS Message that has a subject. 
100/// 
101/// Can be dynamic or static. Can be NATS core message or JetStream message.
102pub trait DynamicSubjectNatsMessage {
103    /// The subject of the message. Can be dynamic.
104    fn subject(&self) -> String;
105}
106
107/// NATS Message that has a subject.
108/// 
109/// Must be static. Can be NATS core message or JetStream message.
110pub trait StaticSubjectNatsMessage {
111    /// The subject of the message. Must be static.
112    fn subject() -> String;
113}
114
115impl<T> DynamicSubjectNatsMessage for T
116where
117    T: StaticSubjectNatsMessage,
118{
119    fn subject(&self) -> String {
120        T::subject()
121    }
122}