1use std::borrow::Cow;
4
5use crate::{coding::*, Path};
6
7use super::util::{decode_namespace, encode_namespace};
8
9const FILTER_TYPE: u8 = 0x01;
11
12const GROUP_ORDER: u8 = 0x02;
14
15#[derive(Clone, Debug)]
18pub struct Subscribe<'a> {
19 pub subscribe_id: u64,
20 pub track_alias: u64,
21 pub track_namespace: Path<'a>,
22 pub track_name: Cow<'a, str>,
23 pub subscriber_priority: u8,
24}
25
26impl<'a> Message for Subscribe<'a> {
27 fn decode<R: bytes::Buf>(r: &mut R) -> Result<Self, DecodeError> {
28 let subscribe_id = u64::decode(r)?;
29 let track_alias = u64::decode(r)?;
30
31 let track_namespace = decode_namespace(r)?;
33
34 let track_name = Cow::<str>::decode(r)?;
35 let subscriber_priority = u8::decode(r)?;
36
37 let group_order = u8::decode(r)?;
38 if group_order != 0 && group_order != GROUP_ORDER {
39 return Err(DecodeError::InvalidValue);
40 }
41
42 let filter_type = u8::decode(r)?;
43 if filter_type != FILTER_TYPE {
44 return Err(DecodeError::InvalidValue);
45 }
46
47 let num_params = u8::decode(r)?;
48 if num_params != 0 {
49 return Err(DecodeError::InvalidValue);
50 }
51
52 Ok(Self {
53 subscribe_id,
54 track_alias,
55 track_namespace,
56 track_name,
57 subscriber_priority,
58 })
59 }
60
61 fn encode<W: bytes::BufMut>(&self, w: &mut W) {
62 self.subscribe_id.encode(w);
63 self.track_alias.encode(w);
64 encode_namespace(w, &self.track_namespace);
65 self.track_name.encode(w);
66 self.subscriber_priority.encode(w);
67 GROUP_ORDER.encode(w);
68 FILTER_TYPE.encode(w);
69 0u8.encode(w); }
71}
72
73#[derive(Clone, Debug)]
75pub struct SubscribeOk {
76 pub subscribe_id: u64,
77 pub largest: Option<(u64, u64)>,
79}
80
81impl Message for SubscribeOk {
82 fn encode<W: bytes::BufMut>(&self, w: &mut W) {
83 self.subscribe_id.encode(w);
84 0u8.encode(w); GROUP_ORDER.encode(w);
86
87 if let Some((group, object)) = self.largest {
88 1u8.encode(w); group.encode(w);
90 object.encode(w);
91 } else {
92 0u8.encode(w); }
94
95 0u8.encode(w); }
97
98 fn decode<R: bytes::Buf>(r: &mut R) -> Result<Self, DecodeError> {
99 let subscribe_id = u64::decode(r)?;
100
101 let expires = u64::decode(r)?;
102 if expires != 0 {
103 return Err(DecodeError::InvalidValue);
104 }
105
106 let _group_order = u8::decode(r)?; let mut largest = None;
109 let content_exists = u8::decode(r)?;
110 if content_exists == 1 {
111 let group = u64::decode(r)?;
112 let object = u64::decode(r)?;
113 largest = Some((group, object));
114 } else if content_exists != 0 {
115 return Err(DecodeError::InvalidValue);
116 }
117
118 let num_params = u8::decode(r)?;
119 if num_params != 0 {
120 return Err(DecodeError::InvalidValue);
121 }
122
123 Ok(Self { subscribe_id, largest })
124 }
125}
126
127#[derive(Clone, Debug)]
129pub struct SubscribeError<'a> {
130 pub subscribe_id: u64,
131 pub error_code: u64,
132 pub reason_phrase: Cow<'a, str>,
133 pub track_alias: u64,
134}
135
136impl<'a> Message for SubscribeError<'a> {
137 fn encode<W: bytes::BufMut>(&self, w: &mut W) {
138 self.subscribe_id.encode(w);
139 self.error_code.encode(w);
140 self.reason_phrase.encode(w);
141 self.track_alias.encode(w);
142 }
143
144 fn decode<R: bytes::Buf>(r: &mut R) -> Result<Self, DecodeError> {
145 let subscribe_id = u64::decode(r)?;
146 let error_code = u64::decode(r)?;
147 let reason_phrase = Cow::<str>::decode(r)?;
148 let track_alias = u64::decode(r)?;
149
150 Ok(Self {
151 subscribe_id,
152 error_code,
153 reason_phrase,
154 track_alias,
155 })
156 }
157}
158
159#[derive(Clone, Debug)]
161pub struct Unsubscribe {
162 pub subscribe_id: u64,
163}
164
165impl Message for Unsubscribe {
166 fn encode<W: bytes::BufMut>(&self, w: &mut W) {
167 self.subscribe_id.encode(w);
168 }
169
170 fn decode<R: bytes::Buf>(r: &mut R) -> Result<Self, DecodeError> {
171 let subscribe_id = u64::decode(r)?;
172 Ok(Self { subscribe_id })
173 }
174}
175
176#[derive(Clone, Debug)]
178pub struct SubscribeDone<'a> {
179 pub subscribe_id: u64,
180 pub status_code: u64,
181 pub reason_phrase: Cow<'a, str>,
182 pub final_group_object: Option<(u64, u64)>,
183}
184
185impl<'a> Message for SubscribeDone<'a> {
186 fn encode<W: bytes::BufMut>(&self, w: &mut W) {
187 self.subscribe_id.encode(w);
188 self.status_code.encode(w);
189 self.reason_phrase.encode(w);
190
191 if let Some((group, object)) = self.final_group_object {
192 1u8.encode(w); group.encode(w);
194 object.encode(w);
195 } else {
196 0u8.encode(w); }
198 }
199
200 fn decode<R: bytes::Buf>(r: &mut R) -> Result<Self, DecodeError> {
201 let subscribe_id = u64::decode(r)?;
202 let status_code = u64::decode(r)?;
203 let reason_phrase = Cow::<str>::decode(r)?;
204
205 let mut final_group_object = None;
206 let content_exists = u64::decode(r)?;
207 if content_exists == 1 {
208 let group = u64::decode(r)?;
209 let object = u64::decode(r)?;
210 final_group_object = Some((group, object));
211 } else if content_exists != 0 {
212 return Err(DecodeError::InvalidValue);
213 }
214
215 Ok(Self {
216 subscribe_id,
217 status_code,
218 reason_phrase,
219 final_group_object,
220 })
221 }
222}
223
224#[cfg(test)]
225mod tests {
226 use super::*;
227 use bytes::BytesMut;
228
229 fn encode_message<M: Message>(msg: &M) -> Vec<u8> {
230 let mut buf = BytesMut::new();
231 msg.encode(&mut buf);
232 buf.to_vec()
233 }
234
235 fn decode_message<M: Message>(bytes: &[u8]) -> Result<M, DecodeError> {
236 let mut buf = bytes::Bytes::from(bytes.to_vec());
237 M::decode(&mut buf)
238 }
239
240 #[test]
241 fn test_subscribe_round_trip() {
242 let msg = Subscribe {
243 subscribe_id: 1,
244 track_alias: 2,
245 track_namespace: Path::new("test"),
246 track_name: "video".into(),
247 subscriber_priority: 128,
248 };
249
250 let encoded = encode_message(&msg);
251 let decoded: Subscribe = decode_message(&encoded).unwrap();
252
253 assert_eq!(decoded.subscribe_id, 1);
254 assert_eq!(decoded.track_alias, 2);
255 assert_eq!(decoded.track_namespace.as_str(), "test");
256 assert_eq!(decoded.track_name, "video");
257 assert_eq!(decoded.subscriber_priority, 128);
258 }
259
260 #[test]
261 fn test_subscribe_nested_namespace() {
262 let msg = Subscribe {
263 subscribe_id: 100,
264 track_alias: 200,
265 track_namespace: Path::new("conference/room123"),
266 track_name: "audio".into(),
267 subscriber_priority: 255,
268 };
269
270 let encoded = encode_message(&msg);
271 let decoded: Subscribe = decode_message(&encoded).unwrap();
272
273 assert_eq!(decoded.track_namespace.as_str(), "conference/room123");
274 }
275
276 #[test]
277 fn test_subscribe_ok_with_largest() {
278 let msg = SubscribeOk {
279 subscribe_id: 42,
280 largest: Some((10, 20)),
281 };
282
283 let encoded = encode_message(&msg);
284 let decoded: SubscribeOk = decode_message(&encoded).unwrap();
285
286 assert_eq!(decoded.subscribe_id, 42);
287 assert_eq!(decoded.largest, Some((10, 20)));
288 }
289
290 #[test]
291 fn test_subscribe_ok_without_largest() {
292 let msg = SubscribeOk {
293 subscribe_id: 42,
294 largest: None,
295 };
296
297 let encoded = encode_message(&msg);
298 let decoded: SubscribeOk = decode_message(&encoded).unwrap();
299
300 assert_eq!(decoded.subscribe_id, 42);
301 assert_eq!(decoded.largest, None);
302 }
303
304 #[test]
305 fn test_subscribe_error() {
306 let msg = SubscribeError {
307 subscribe_id: 123,
308 error_code: 500,
309 reason_phrase: "Not found".into(),
310 track_alias: 456,
311 };
312
313 let encoded = encode_message(&msg);
314 let decoded: SubscribeError = decode_message(&encoded).unwrap();
315
316 assert_eq!(decoded.subscribe_id, 123);
317 assert_eq!(decoded.error_code, 500);
318 assert_eq!(decoded.reason_phrase, "Not found");
319 assert_eq!(decoded.track_alias, 456);
320 }
321
322 #[test]
323 fn test_unsubscribe() {
324 let msg = Unsubscribe { subscribe_id: 999 };
325
326 let encoded = encode_message(&msg);
327 let decoded: Unsubscribe = decode_message(&encoded).unwrap();
328
329 assert_eq!(decoded.subscribe_id, 999);
330 }
331
332 #[test]
333 fn test_subscribe_done_with_final() {
334 let msg = SubscribeDone {
335 subscribe_id: 10,
336 status_code: 0,
337 reason_phrase: "complete".into(),
338 final_group_object: Some((5, 10)),
339 };
340
341 let encoded = encode_message(&msg);
342 let decoded: SubscribeDone = decode_message(&encoded).unwrap();
343
344 assert_eq!(decoded.subscribe_id, 10);
345 assert_eq!(decoded.status_code, 0);
346 assert_eq!(decoded.reason_phrase, "complete");
347 assert_eq!(decoded.final_group_object, Some((5, 10)));
348 }
349
350 #[test]
351 fn test_subscribe_done_without_final() {
352 let msg = SubscribeDone {
353 subscribe_id: 10,
354 status_code: 1,
355 reason_phrase: "error".into(),
356 final_group_object: None,
357 };
358
359 let encoded = encode_message(&msg);
360 let decoded: SubscribeDone = decode_message(&encoded).unwrap();
361
362 assert_eq!(decoded.final_group_object, None);
363 }
364
365 #[test]
366 fn test_subscribe_rejects_invalid_filter_type() {
367 #[rustfmt::skip]
368 let invalid_bytes = vec![
369 0x01, 0x02, 0x01, 0x04, 0x74, 0x65, 0x73, 0x74, 0x05, 0x76, 0x69, 0x64, 0x65, 0x6f, 0x80, 0x02, 0x99, 0x00, ];
379
380 let result: Result<Subscribe, _> = decode_message(&invalid_bytes);
381 assert!(result.is_err());
382 }
383
384 #[test]
385 fn test_subscribe_ok_rejects_non_zero_expires() {
386 #[rustfmt::skip]
387 let invalid_bytes = vec![
388 0x01, 0x05, 0x02, 0x00, 0x00, ];
394
395 let result: Result<SubscribeOk, _> = decode_message(&invalid_bytes);
396 assert!(result.is_err());
397 }
398}