moq_lite/ietf/
subscribe.rs

1//! IETF moq-transport-07 subscribe messages
2
3use std::borrow::Cow;
4
5use crate::{coding::*, Path};
6
7use super::util::{decode_namespace, encode_namespace};
8
9// We only support Latest Group (0x1)
10const FILTER_TYPE: u8 = 0x01;
11
12// We only support Group Order descending (0x02)
13const GROUP_ORDER: u8 = 0x02;
14
15/// Subscribe message (0x03)
16/// Sent by the subscriber to request all future objects for the given track.
17#[derive(Clone, Debug)]
18pub struct Subscribe<'a> {
19	pub subscribe_id: u64,
20	pub track_alias: u64,
21	pub track_namespace: Path<'a>,
22	pub track_name: Cow<'a, str>,
23	pub subscriber_priority: u8,
24}
25
26impl<'a> Message for Subscribe<'a> {
27	fn decode<R: bytes::Buf>(r: &mut R) -> Result<Self, DecodeError> {
28		let subscribe_id = u64::decode(r)?;
29		let track_alias = u64::decode(r)?;
30
31		// Decode namespace (tuple of strings)
32		let track_namespace = decode_namespace(r)?;
33
34		let track_name = Cow::<str>::decode(r)?;
35		let subscriber_priority = u8::decode(r)?;
36
37		let group_order = u8::decode(r)?;
38		if group_order != 0 && group_order != GROUP_ORDER {
39			return Err(DecodeError::InvalidValue);
40		}
41
42		let filter_type = u8::decode(r)?;
43		if filter_type != FILTER_TYPE {
44			return Err(DecodeError::InvalidValue);
45		}
46
47		let num_params = u8::decode(r)?;
48		if num_params != 0 {
49			return Err(DecodeError::InvalidValue);
50		}
51
52		Ok(Self {
53			subscribe_id,
54			track_alias,
55			track_namespace,
56			track_name,
57			subscriber_priority,
58		})
59	}
60
61	fn encode<W: bytes::BufMut>(&self, w: &mut W) {
62		self.subscribe_id.encode(w);
63		self.track_alias.encode(w);
64		encode_namespace(w, &self.track_namespace);
65		self.track_name.encode(w);
66		self.subscriber_priority.encode(w);
67		GROUP_ORDER.encode(w);
68		FILTER_TYPE.encode(w);
69		0u8.encode(w); // no parameters
70	}
71}
72
73/// SubscribeOk message (0x04)
74#[derive(Clone, Debug)]
75pub struct SubscribeOk {
76	pub subscribe_id: u64,
77	/// Largest group/object ID tuple
78	pub largest: Option<(u64, u64)>,
79}
80
81impl Message for SubscribeOk {
82	fn encode<W: bytes::BufMut>(&self, w: &mut W) {
83		self.subscribe_id.encode(w);
84		0u8.encode(w); // expires = 0
85		GROUP_ORDER.encode(w);
86
87		if let Some((group, object)) = self.largest {
88			1u8.encode(w); // content exists
89			group.encode(w);
90			object.encode(w);
91		} else {
92			0u8.encode(w); // no content
93		}
94
95		0u8.encode(w); // no parameters
96	}
97
98	fn decode<R: bytes::Buf>(r: &mut R) -> Result<Self, DecodeError> {
99		let subscribe_id = u64::decode(r)?;
100
101		let expires = u64::decode(r)?;
102		if expires != 0 {
103			return Err(DecodeError::InvalidValue);
104		}
105
106		let _group_order = u8::decode(r)?; // Don't care about group order
107
108		let mut largest = None;
109		let content_exists = u8::decode(r)?;
110		if content_exists == 1 {
111			let group = u64::decode(r)?;
112			let object = u64::decode(r)?;
113			largest = Some((group, object));
114		} else if content_exists != 0 {
115			return Err(DecodeError::InvalidValue);
116		}
117
118		let num_params = u8::decode(r)?;
119		if num_params != 0 {
120			return Err(DecodeError::InvalidValue);
121		}
122
123		Ok(Self { subscribe_id, largest })
124	}
125}
126
127/// SubscribeError message (0x05)
128#[derive(Clone, Debug)]
129pub struct SubscribeError<'a> {
130	pub subscribe_id: u64,
131	pub error_code: u64,
132	pub reason_phrase: Cow<'a, str>,
133	pub track_alias: u64,
134}
135
136impl<'a> Message for SubscribeError<'a> {
137	fn encode<W: bytes::BufMut>(&self, w: &mut W) {
138		self.subscribe_id.encode(w);
139		self.error_code.encode(w);
140		self.reason_phrase.encode(w);
141		self.track_alias.encode(w);
142	}
143
144	fn decode<R: bytes::Buf>(r: &mut R) -> Result<Self, DecodeError> {
145		let subscribe_id = u64::decode(r)?;
146		let error_code = u64::decode(r)?;
147		let reason_phrase = Cow::<str>::decode(r)?;
148		let track_alias = u64::decode(r)?;
149
150		Ok(Self {
151			subscribe_id,
152			error_code,
153			reason_phrase,
154			track_alias,
155		})
156	}
157}
158
159/// Unsubscribe message (0x0a)
160#[derive(Clone, Debug)]
161pub struct Unsubscribe {
162	pub subscribe_id: u64,
163}
164
165impl Message for Unsubscribe {
166	fn encode<W: bytes::BufMut>(&self, w: &mut W) {
167		self.subscribe_id.encode(w);
168	}
169
170	fn decode<R: bytes::Buf>(r: &mut R) -> Result<Self, DecodeError> {
171		let subscribe_id = u64::decode(r)?;
172		Ok(Self { subscribe_id })
173	}
174}
175
176/// SubscribeDone message (0x0b)
177#[derive(Clone, Debug)]
178pub struct SubscribeDone<'a> {
179	pub subscribe_id: u64,
180	pub status_code: u64,
181	pub reason_phrase: Cow<'a, str>,
182	pub final_group_object: Option<(u64, u64)>,
183}
184
185impl<'a> Message for SubscribeDone<'a> {
186	fn encode<W: bytes::BufMut>(&self, w: &mut W) {
187		self.subscribe_id.encode(w);
188		self.status_code.encode(w);
189		self.reason_phrase.encode(w);
190
191		if let Some((group, object)) = self.final_group_object {
192			1u8.encode(w); // content exists
193			group.encode(w);
194			object.encode(w);
195		} else {
196			0u8.encode(w); // no content
197		}
198	}
199
200	fn decode<R: bytes::Buf>(r: &mut R) -> Result<Self, DecodeError> {
201		let subscribe_id = u64::decode(r)?;
202		let status_code = u64::decode(r)?;
203		let reason_phrase = Cow::<str>::decode(r)?;
204
205		let mut final_group_object = None;
206		let content_exists = u64::decode(r)?;
207		if content_exists == 1 {
208			let group = u64::decode(r)?;
209			let object = u64::decode(r)?;
210			final_group_object = Some((group, object));
211		} else if content_exists != 0 {
212			return Err(DecodeError::InvalidValue);
213		}
214
215		Ok(Self {
216			subscribe_id,
217			status_code,
218			reason_phrase,
219			final_group_object,
220		})
221	}
222}
223
224#[cfg(test)]
225mod tests {
226	use super::*;
227	use bytes::BytesMut;
228
229	fn encode_message<M: Message>(msg: &M) -> Vec<u8> {
230		let mut buf = BytesMut::new();
231		msg.encode(&mut buf);
232		buf.to_vec()
233	}
234
235	fn decode_message<M: Message>(bytes: &[u8]) -> Result<M, DecodeError> {
236		let mut buf = bytes::Bytes::from(bytes.to_vec());
237		M::decode(&mut buf)
238	}
239
240	#[test]
241	fn test_subscribe_round_trip() {
242		let msg = Subscribe {
243			subscribe_id: 1,
244			track_alias: 2,
245			track_namespace: Path::new("test"),
246			track_name: "video".into(),
247			subscriber_priority: 128,
248		};
249
250		let encoded = encode_message(&msg);
251		let decoded: Subscribe = decode_message(&encoded).unwrap();
252
253		assert_eq!(decoded.subscribe_id, 1);
254		assert_eq!(decoded.track_alias, 2);
255		assert_eq!(decoded.track_namespace.as_str(), "test");
256		assert_eq!(decoded.track_name, "video");
257		assert_eq!(decoded.subscriber_priority, 128);
258	}
259
260	#[test]
261	fn test_subscribe_nested_namespace() {
262		let msg = Subscribe {
263			subscribe_id: 100,
264			track_alias: 200,
265			track_namespace: Path::new("conference/room123"),
266			track_name: "audio".into(),
267			subscriber_priority: 255,
268		};
269
270		let encoded = encode_message(&msg);
271		let decoded: Subscribe = decode_message(&encoded).unwrap();
272
273		assert_eq!(decoded.track_namespace.as_str(), "conference/room123");
274	}
275
276	#[test]
277	fn test_subscribe_ok_with_largest() {
278		let msg = SubscribeOk {
279			subscribe_id: 42,
280			largest: Some((10, 20)),
281		};
282
283		let encoded = encode_message(&msg);
284		let decoded: SubscribeOk = decode_message(&encoded).unwrap();
285
286		assert_eq!(decoded.subscribe_id, 42);
287		assert_eq!(decoded.largest, Some((10, 20)));
288	}
289
290	#[test]
291	fn test_subscribe_ok_without_largest() {
292		let msg = SubscribeOk {
293			subscribe_id: 42,
294			largest: None,
295		};
296
297		let encoded = encode_message(&msg);
298		let decoded: SubscribeOk = decode_message(&encoded).unwrap();
299
300		assert_eq!(decoded.subscribe_id, 42);
301		assert_eq!(decoded.largest, None);
302	}
303
304	#[test]
305	fn test_subscribe_error() {
306		let msg = SubscribeError {
307			subscribe_id: 123,
308			error_code: 500,
309			reason_phrase: "Not found".into(),
310			track_alias: 456,
311		};
312
313		let encoded = encode_message(&msg);
314		let decoded: SubscribeError = decode_message(&encoded).unwrap();
315
316		assert_eq!(decoded.subscribe_id, 123);
317		assert_eq!(decoded.error_code, 500);
318		assert_eq!(decoded.reason_phrase, "Not found");
319		assert_eq!(decoded.track_alias, 456);
320	}
321
322	#[test]
323	fn test_unsubscribe() {
324		let msg = Unsubscribe { subscribe_id: 999 };
325
326		let encoded = encode_message(&msg);
327		let decoded: Unsubscribe = decode_message(&encoded).unwrap();
328
329		assert_eq!(decoded.subscribe_id, 999);
330	}
331
332	#[test]
333	fn test_subscribe_done_with_final() {
334		let msg = SubscribeDone {
335			subscribe_id: 10,
336			status_code: 0,
337			reason_phrase: "complete".into(),
338			final_group_object: Some((5, 10)),
339		};
340
341		let encoded = encode_message(&msg);
342		let decoded: SubscribeDone = decode_message(&encoded).unwrap();
343
344		assert_eq!(decoded.subscribe_id, 10);
345		assert_eq!(decoded.status_code, 0);
346		assert_eq!(decoded.reason_phrase, "complete");
347		assert_eq!(decoded.final_group_object, Some((5, 10)));
348	}
349
350	#[test]
351	fn test_subscribe_done_without_final() {
352		let msg = SubscribeDone {
353			subscribe_id: 10,
354			status_code: 1,
355			reason_phrase: "error".into(),
356			final_group_object: None,
357		};
358
359		let encoded = encode_message(&msg);
360		let decoded: SubscribeDone = decode_message(&encoded).unwrap();
361
362		assert_eq!(decoded.final_group_object, None);
363	}
364
365	#[test]
366	fn test_subscribe_rejects_invalid_filter_type() {
367		#[rustfmt::skip]
368		let invalid_bytes = vec![
369			0x01, // subscribe_id
370			0x02, // track_alias
371			0x01, // namespace length
372			0x04, 0x74, 0x65, 0x73, 0x74, // "test"
373			0x05, 0x76, 0x69, 0x64, 0x65, 0x6f, // "video"
374			0x80, // subscriber_priority
375			0x02, // group_order
376			0x99, // INVALID filter_type
377			0x00, // num_params
378		];
379
380		let result: Result<Subscribe, _> = decode_message(&invalid_bytes);
381		assert!(result.is_err());
382	}
383
384	#[test]
385	fn test_subscribe_ok_rejects_non_zero_expires() {
386		#[rustfmt::skip]
387		let invalid_bytes = vec![
388			0x01, // subscribe_id
389			0x05, // INVALID: expires = 5
390			0x02, // group_order
391			0x00, // content_exists
392			0x00, // num_params
393		];
394
395		let result: Result<SubscribeOk, _> = decode_message(&invalid_bytes);
396		assert!(result.is_err());
397	}
398}