1use serde::de::Error;
2use std::sync::Arc;
3#[cfg(feature = "jetstream")]
4use async_nats::jetstream::Context;
5
6#[async_trait::async_trait]
7pub trait NatsMessage
11where
12 Self: Sized + Send,
13{
14 type SerError: std::error::Error + Send + Sync + 'static;
16
17 type DeError: std::error::Error + Send + Sync + 'static;
19
20 fn to_bytes(&self) -> Result<Arc<[u8]>, Self::SerError>;
22
23 fn parse_from_bytes(bytes: impl AsRef<[u8]>) -> Result<Self, Self::DeError>;
25}
26
27impl<T: NatsJsonMessage> NatsMessage for T {
28 type SerError = serde_json::Error;
29 type DeError = serde_json::Error;
30
31 fn to_bytes(&self) -> Result<Arc<[u8]>, Self::SerError> {
32 self.to_json_bytes()
33 }
34
35 fn parse_from_bytes(bytes: impl AsRef<[u8]>) -> Result<Self, Self::DeError> {
36 Self::from_json_bytes(bytes)
37 }
38}
39
40pub trait NatsJsonMessage
48where
49 Self: serde::Serialize + for<'de> serde::de::Deserialize<'de> + Sized + Send,
50{
51 #[allow(missing_docs)]
52 fn to_json_bytes(&self) -> Result<Arc<[u8]>, serde_json::Error> {
53 let json = serde_json::to_string(self)?;
54 Ok(Arc::from(json.into_bytes()))
55 }
56 #[allow(missing_docs)]
57 fn from_json_bytes(bytes: impl AsRef<[u8]>) -> Result<Self, serde_json::Error> {
58 let json = std::str::from_utf8(bytes.as_ref())
59 .map_err(|_| serde_json::Error::custom("Failed to parse string from bytes"))?;
60 let rpc_request = serde_json::from_str(json)?;
61 Ok(rpc_request)
62 }
63}
64
65impl NatsJsonMessage for () {}
66impl NatsJsonMessage for serde_json::Value {}
67
68#[async_trait::async_trait]
69pub trait NatsCoreMessageSendTrait: NatsMessage + DynamicSubjectNatsMessage {
71
72 #[doc(hidden)]
73 async fn publish(&self, nats: &async_nats::Client) -> anyhow::Result<()> {
77 let subject = self.subject();
78 let bytes = self.to_bytes()?;
79 nats.publish(subject, bytes.to_vec().into()).await?;
80 Ok(())
81 }
82}
83
84#[cfg(feature = "jetstream")]
85#[async_trait::async_trait]
86pub trait JetStreamMessageSendTrait: NatsMessage + DynamicSubjectNatsMessage {
88
89 #[doc(hidden)]
90 async fn publish(&self, js_context: &Context) -> anyhow::Result<()> {
94 js_context.publish(self.subject(), self.to_bytes()?.to_vec().into()).await?;
95 Ok(())
96 }
97}
98
99pub trait DynamicSubjectNatsMessage {
103 fn subject(&self) -> String;
105}
106
107pub trait StaticSubjectNatsMessage {
111 fn subject() -> String;
113}
114
115impl<T> DynamicSubjectNatsMessage for T
116where
117 T: StaticSubjectNatsMessage,
118{
119 fn subject(&self) -> String {
120 T::subject()
121 }
122}