Skip to main content

topiq_protocol/
frame.rs

1use bytes::Bytes;
2use serde::{Deserialize, Deserializer, Serialize, Serializer};
3
4/// Current wire protocol version.
5pub const PROTOCOL_VERSION: u8 = 1;
6
7/// A frame on the wire. This is the unit of communication between client and broker.
8#[derive(Debug, Clone, PartialEq, Eq)]
9pub enum Frame {
10    /// Client -> Broker: publish a message to a topic.
11    Publish {
12        topic: String,
13        payload: Bytes,
14        reply_to: Option<String>,
15    },
16
17    /// Client -> Broker: subscribe to a subject pattern.
18    Subscribe {
19        sid: u64,
20        subject: String,
21        queue_group: Option<String>,
22    },
23
24    /// Client -> Broker: unsubscribe from a subscription.
25    Unsubscribe { sid: u64 },
26
27    /// Broker -> Client: deliver a message to a subscriber.
28    Message {
29        topic: String,
30        sid: u64,
31        payload: Bytes,
32        reply_to: Option<String>,
33    },
34
35    /// Bidirectional: keepalive ping.
36    Ping,
37
38    /// Bidirectional: keepalive pong.
39    Pong,
40
41    /// Broker -> Client: operation succeeded.
42    Ok,
43
44    /// Broker -> Client: operation failed.
45    Err { message: String },
46}
47
48// Serialize borrows from Frame to avoid payload copy and string clones.
49impl Serialize for Frame {
50    fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
51        FrameSerHelper::from(self).serialize(serializer)
52    }
53}
54
55impl<'de> Deserialize<'de> for Frame {
56    fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
57        FrameDeHelper::deserialize(deserializer).map(Frame::from)
58    }
59}
60
61/// Serialize-only helper: borrows from Frame to avoid copies.
62/// Uses `serde_bytes` for payload fields to encode as msgpack binary.
63#[derive(Serialize)]
64enum FrameSerHelper<'a> {
65    Publish {
66        topic: &'a str,
67        #[serde(with = "serde_bytes")]
68        payload: &'a [u8],
69        reply_to: &'a Option<String>,
70    },
71    Subscribe {
72        sid: u64,
73        subject: &'a str,
74        queue_group: &'a Option<String>,
75    },
76    Unsubscribe {
77        sid: u64,
78    },
79    Message {
80        topic: &'a str,
81        sid: u64,
82        #[serde(with = "serde_bytes")]
83        payload: &'a [u8],
84        reply_to: &'a Option<String>,
85    },
86    Ping,
87    Pong,
88    Ok,
89    Err {
90        message: &'a str,
91    },
92}
93
94impl<'a> From<&'a Frame> for FrameSerHelper<'a> {
95    fn from(frame: &'a Frame) -> Self {
96        match frame {
97            Frame::Publish {
98                topic,
99                payload,
100                reply_to,
101            } => FrameSerHelper::Publish {
102                topic,
103                payload: payload.as_ref(),
104                reply_to,
105            },
106            Frame::Subscribe {
107                sid,
108                subject,
109                queue_group,
110            } => FrameSerHelper::Subscribe {
111                sid: *sid,
112                subject,
113                queue_group,
114            },
115            Frame::Unsubscribe { sid } => FrameSerHelper::Unsubscribe { sid: *sid },
116            Frame::Message {
117                topic,
118                sid,
119                payload,
120                reply_to,
121            } => FrameSerHelper::Message {
122                topic,
123                sid: *sid,
124                payload: payload.as_ref(),
125                reply_to,
126            },
127            Frame::Ping => FrameSerHelper::Ping,
128            Frame::Pong => FrameSerHelper::Pong,
129            Frame::Ok => FrameSerHelper::Ok,
130            Frame::Err { message } => FrameSerHelper::Err { message },
131        }
132    }
133}
134
135/// Deserialize-only helper: owns data, uses `serde_bytes` for binary decoding.
136#[derive(Deserialize)]
137enum FrameDeHelper {
138    Publish {
139        topic: String,
140        #[serde(with = "serde_bytes")]
141        payload: Vec<u8>,
142        reply_to: Option<String>,
143    },
144    Subscribe {
145        sid: u64,
146        subject: String,
147        queue_group: Option<String>,
148    },
149    Unsubscribe {
150        sid: u64,
151    },
152    Message {
153        topic: String,
154        sid: u64,
155        #[serde(with = "serde_bytes")]
156        payload: Vec<u8>,
157        reply_to: Option<String>,
158    },
159    Ping,
160    Pong,
161    Ok,
162    Err {
163        message: String,
164    },
165}
166
167impl From<FrameDeHelper> for Frame {
168    fn from(helper: FrameDeHelper) -> Self {
169        match helper {
170            FrameDeHelper::Publish {
171                topic,
172                payload,
173                reply_to,
174            } => Frame::Publish {
175                topic,
176                payload: Bytes::from(payload),
177                reply_to,
178            },
179            FrameDeHelper::Subscribe {
180                sid,
181                subject,
182                queue_group,
183            } => Frame::Subscribe {
184                sid,
185                subject,
186                queue_group,
187            },
188            FrameDeHelper::Unsubscribe { sid } => Frame::Unsubscribe { sid },
189            FrameDeHelper::Message {
190                topic,
191                sid,
192                payload,
193                reply_to,
194            } => Frame::Message {
195                topic,
196                sid,
197                payload: Bytes::from(payload),
198                reply_to,
199            },
200            FrameDeHelper::Ping => Frame::Ping,
201            FrameDeHelper::Pong => Frame::Pong,
202            FrameDeHelper::Ok => Frame::Ok,
203            FrameDeHelper::Err { message } => Frame::Err { message },
204        }
205    }
206}
207
208#[cfg(test)]
209mod tests {
210    use super::*;
211
212    fn roundtrip(frame: &Frame) -> Frame {
213        let encoded = rmp_serde::to_vec(frame).unwrap();
214        rmp_serde::from_slice(&encoded).unwrap()
215    }
216
217    #[test]
218    fn roundtrip_publish() {
219        let frame = Frame::Publish {
220            topic: "test.topic".into(),
221            payload: Bytes::from("hello"),
222            reply_to: None,
223        };
224        assert_eq!(roundtrip(&frame), frame);
225    }
226
227    #[test]
228    fn roundtrip_publish_with_reply() {
229        let frame = Frame::Publish {
230            topic: "req".into(),
231            payload: Bytes::from("data"),
232            reply_to: Some("inbox.123".into()),
233        };
234        assert_eq!(roundtrip(&frame), frame);
235    }
236
237    #[test]
238    fn roundtrip_subscribe() {
239        let frame = Frame::Subscribe {
240            sid: 42,
241            subject: "sensors.>".into(),
242            queue_group: Some("workers".into()),
243        };
244        assert_eq!(roundtrip(&frame), frame);
245    }
246
247    #[test]
248    fn roundtrip_unsubscribe() {
249        let frame = Frame::Unsubscribe { sid: 7 };
250        assert_eq!(roundtrip(&frame), frame);
251    }
252
253    #[test]
254    fn roundtrip_message() {
255        let frame = Frame::Message {
256            topic: "sensors.temp".into(),
257            sid: 1,
258            payload: Bytes::from("25.3"),
259            reply_to: None,
260        };
261        assert_eq!(roundtrip(&frame), frame);
262    }
263
264    #[test]
265    fn roundtrip_ping_pong() {
266        assert_eq!(roundtrip(&Frame::Ping), Frame::Ping);
267        assert_eq!(roundtrip(&Frame::Pong), Frame::Pong);
268    }
269
270    #[test]
271    fn roundtrip_ok() {
272        assert_eq!(roundtrip(&Frame::Ok), Frame::Ok);
273    }
274
275    #[test]
276    fn roundtrip_err() {
277        let frame = Frame::Err {
278            message: "bad request".into(),
279        };
280        assert_eq!(roundtrip(&frame), frame);
281    }
282
283    #[test]
284    fn empty_payload() {
285        let frame = Frame::Publish {
286            topic: "t".into(),
287            payload: Bytes::new(),
288            reply_to: None,
289        };
290        assert_eq!(roundtrip(&frame), frame);
291    }
292}