1use bytes::Bytes;
2use serde::{Deserialize, Deserializer, Serialize, Serializer};
3
4pub const PROTOCOL_VERSION: u8 = 1;
6
7#[derive(Debug, Clone, PartialEq, Eq)]
9pub enum Frame {
10 Publish {
12 topic: String,
13 payload: Bytes,
14 reply_to: Option<String>,
15 },
16
17 Subscribe {
19 sid: u64,
20 subject: String,
21 queue_group: Option<String>,
22 },
23
24 Unsubscribe { sid: u64 },
26
27 Message {
29 topic: String,
30 sid: u64,
31 payload: Bytes,
32 reply_to: Option<String>,
33 },
34
35 Ping,
37
38 Pong,
40
41 Ok,
43
44 Err { message: String },
46}
47
48impl Serialize for Frame {
50 fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
51 FrameSerHelper::from(self).serialize(serializer)
52 }
53}
54
55impl<'de> Deserialize<'de> for Frame {
56 fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
57 FrameDeHelper::deserialize(deserializer).map(Frame::from)
58 }
59}
60
61#[derive(Serialize)]
64enum FrameSerHelper<'a> {
65 Publish {
66 topic: &'a str,
67 #[serde(with = "serde_bytes")]
68 payload: &'a [u8],
69 reply_to: &'a Option<String>,
70 },
71 Subscribe {
72 sid: u64,
73 subject: &'a str,
74 queue_group: &'a Option<String>,
75 },
76 Unsubscribe {
77 sid: u64,
78 },
79 Message {
80 topic: &'a str,
81 sid: u64,
82 #[serde(with = "serde_bytes")]
83 payload: &'a [u8],
84 reply_to: &'a Option<String>,
85 },
86 Ping,
87 Pong,
88 Ok,
89 Err {
90 message: &'a str,
91 },
92}
93
94impl<'a> From<&'a Frame> for FrameSerHelper<'a> {
95 fn from(frame: &'a Frame) -> Self {
96 match frame {
97 Frame::Publish {
98 topic,
99 payload,
100 reply_to,
101 } => FrameSerHelper::Publish {
102 topic,
103 payload: payload.as_ref(),
104 reply_to,
105 },
106 Frame::Subscribe {
107 sid,
108 subject,
109 queue_group,
110 } => FrameSerHelper::Subscribe {
111 sid: *sid,
112 subject,
113 queue_group,
114 },
115 Frame::Unsubscribe { sid } => FrameSerHelper::Unsubscribe { sid: *sid },
116 Frame::Message {
117 topic,
118 sid,
119 payload,
120 reply_to,
121 } => FrameSerHelper::Message {
122 topic,
123 sid: *sid,
124 payload: payload.as_ref(),
125 reply_to,
126 },
127 Frame::Ping => FrameSerHelper::Ping,
128 Frame::Pong => FrameSerHelper::Pong,
129 Frame::Ok => FrameSerHelper::Ok,
130 Frame::Err { message } => FrameSerHelper::Err { message },
131 }
132 }
133}
134
135#[derive(Deserialize)]
137enum FrameDeHelper {
138 Publish {
139 topic: String,
140 #[serde(with = "serde_bytes")]
141 payload: Vec<u8>,
142 reply_to: Option<String>,
143 },
144 Subscribe {
145 sid: u64,
146 subject: String,
147 queue_group: Option<String>,
148 },
149 Unsubscribe {
150 sid: u64,
151 },
152 Message {
153 topic: String,
154 sid: u64,
155 #[serde(with = "serde_bytes")]
156 payload: Vec<u8>,
157 reply_to: Option<String>,
158 },
159 Ping,
160 Pong,
161 Ok,
162 Err {
163 message: String,
164 },
165}
166
167impl From<FrameDeHelper> for Frame {
168 fn from(helper: FrameDeHelper) -> Self {
169 match helper {
170 FrameDeHelper::Publish {
171 topic,
172 payload,
173 reply_to,
174 } => Frame::Publish {
175 topic,
176 payload: Bytes::from(payload),
177 reply_to,
178 },
179 FrameDeHelper::Subscribe {
180 sid,
181 subject,
182 queue_group,
183 } => Frame::Subscribe {
184 sid,
185 subject,
186 queue_group,
187 },
188 FrameDeHelper::Unsubscribe { sid } => Frame::Unsubscribe { sid },
189 FrameDeHelper::Message {
190 topic,
191 sid,
192 payload,
193 reply_to,
194 } => Frame::Message {
195 topic,
196 sid,
197 payload: Bytes::from(payload),
198 reply_to,
199 },
200 FrameDeHelper::Ping => Frame::Ping,
201 FrameDeHelper::Pong => Frame::Pong,
202 FrameDeHelper::Ok => Frame::Ok,
203 FrameDeHelper::Err { message } => Frame::Err { message },
204 }
205 }
206}
207
208#[cfg(test)]
209mod tests {
210 use super::*;
211
212 fn roundtrip(frame: &Frame) -> Frame {
213 let encoded = rmp_serde::to_vec(frame).unwrap();
214 rmp_serde::from_slice(&encoded).unwrap()
215 }
216
217 #[test]
218 fn roundtrip_publish() {
219 let frame = Frame::Publish {
220 topic: "test.topic".into(),
221 payload: Bytes::from("hello"),
222 reply_to: None,
223 };
224 assert_eq!(roundtrip(&frame), frame);
225 }
226
227 #[test]
228 fn roundtrip_publish_with_reply() {
229 let frame = Frame::Publish {
230 topic: "req".into(),
231 payload: Bytes::from("data"),
232 reply_to: Some("inbox.123".into()),
233 };
234 assert_eq!(roundtrip(&frame), frame);
235 }
236
237 #[test]
238 fn roundtrip_subscribe() {
239 let frame = Frame::Subscribe {
240 sid: 42,
241 subject: "sensors.>".into(),
242 queue_group: Some("workers".into()),
243 };
244 assert_eq!(roundtrip(&frame), frame);
245 }
246
247 #[test]
248 fn roundtrip_unsubscribe() {
249 let frame = Frame::Unsubscribe { sid: 7 };
250 assert_eq!(roundtrip(&frame), frame);
251 }
252
253 #[test]
254 fn roundtrip_message() {
255 let frame = Frame::Message {
256 topic: "sensors.temp".into(),
257 sid: 1,
258 payload: Bytes::from("25.3"),
259 reply_to: None,
260 };
261 assert_eq!(roundtrip(&frame), frame);
262 }
263
264 #[test]
265 fn roundtrip_ping_pong() {
266 assert_eq!(roundtrip(&Frame::Ping), Frame::Ping);
267 assert_eq!(roundtrip(&Frame::Pong), Frame::Pong);
268 }
269
270 #[test]
271 fn roundtrip_ok() {
272 assert_eq!(roundtrip(&Frame::Ok), Frame::Ok);
273 }
274
275 #[test]
276 fn roundtrip_err() {
277 let frame = Frame::Err {
278 message: "bad request".into(),
279 };
280 assert_eq!(roundtrip(&frame), frame);
281 }
282
283 #[test]
284 fn empty_payload() {
285 let frame = Frame::Publish {
286 topic: "t".into(),
287 payload: Bytes::new(),
288 reply_to: None,
289 };
290 assert_eq!(roundtrip(&frame), frame);
291 }
292}