Skip to main content

moq_transport/data/
subgroup.rs

1use crate::coding::{Decode, DecodeError, Encode, EncodeError};
2use crate::data::{ExtensionHeaders, ObjectStatus, StreamHeaderType};
3
4#[derive(Debug, Clone, Eq, PartialEq)]
5pub struct SubgroupHeader {
6    /// Subgroup Header Type
7    pub header_type: StreamHeaderType,
8
9    /// The track alias.
10    pub track_alias: u64,
11
12    /// The group sequence number
13    pub group_id: u64,
14
15    /// The subgroup sequence number
16    pub subgroup_id: Option<u64>,
17
18    /// Publisher priority, where **smaller** values are sent first.
19    pub publisher_priority: u8,
20}
21
22// Note:  Not using the Decode trait, since we need to know the header_type to properly parse this, and it
23//        is read before knowing we need to decode this.
24impl SubgroupHeader {
25    pub fn decode<R: bytes::Buf>(
26        header_type: StreamHeaderType,
27        r: &mut R,
28    ) -> Result<Self, DecodeError> {
29        tracing::trace!(
30            "[DECODE] SubgroupHeader: starting decode with header_type={:?}, buffer_remaining={} bytes",
31            header_type,
32            r.remaining()
33        );
34
35        let track_alias = u64::decode(r)?;
36        tracing::trace!("[DECODE] SubgroupHeader: track_alias={}", track_alias);
37
38        let group_id = u64::decode(r)?;
39        tracing::trace!("[DECODE] SubgroupHeader: group_id={}", group_id);
40
41        let subgroup_id = match header_type.has_subgroup_id() {
42            true => {
43                let id = u64::decode(r)?;
44                tracing::trace!("[DECODE] SubgroupHeader: subgroup_id={}", id);
45                Some(id)
46            }
47            false => {
48                tracing::trace!(
49                    "[DECODE] SubgroupHeader: subgroup_id=None (not present for this header type)"
50                );
51                None
52            }
53        };
54
55        let publisher_priority = u8::decode(r)?;
56        tracing::trace!(
57            "[DECODE] SubgroupHeader: publisher_priority={}, buffer_remaining={} bytes",
58            publisher_priority,
59            r.remaining()
60        );
61
62        let result = Self {
63            header_type,
64            track_alias,
65            group_id,
66            subgroup_id,
67            publisher_priority,
68        };
69
70        tracing::debug!(
71            "[DECODE] SubgroupHeader complete: track_alias={}, group_id={}, subgroup_id={:?}, priority={}",
72            result.track_alias,
73            result.group_id,
74            result.subgroup_id,
75            result.publisher_priority
76        );
77
78        Ok(result)
79    }
80}
81
82impl Encode for SubgroupHeader {
83    fn encode<W: bytes::BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
84        tracing::trace!(
85            "[ENCODE] SubgroupHeader: starting encode - track_alias={}, group_id={}, subgroup_id={:?}, priority={}, header_type={:?}",
86            self.track_alias,
87            self.group_id,
88            self.subgroup_id,
89            self.publisher_priority,
90            self.header_type
91        );
92
93        let start_pos = w.remaining_mut();
94
95        self.header_type.encode(w)?;
96        tracing::trace!("[ENCODE] SubgroupHeader: encoded header_type");
97
98        self.track_alias.encode(w)?;
99        tracing::trace!(
100            "[ENCODE] SubgroupHeader: encoded track_alias={}",
101            self.track_alias
102        );
103
104        self.group_id.encode(w)?;
105        tracing::trace!(
106            "[ENCODE] SubgroupHeader: encoded group_id={}",
107            self.group_id
108        );
109
110        if self.header_type.has_subgroup_id() {
111            if let Some(subgroup_id) = self.subgroup_id {
112                subgroup_id.encode(w)?;
113                tracing::trace!(
114                    "[ENCODE] SubgroupHeader: encoded subgroup_id={}",
115                    subgroup_id
116                );
117            } else {
118                tracing::error!(
119                    "[ENCODE] SubgroupHeader: MISSING subgroup_id for header_type={:?}",
120                    self.header_type
121                );
122                return Err(EncodeError::MissingField("SubgroupId".to_string()));
123            }
124        } else {
125            tracing::trace!("[ENCODE] SubgroupHeader: subgroup_id not encoded (not required for this header type)");
126        }
127
128        self.publisher_priority.encode(w)?;
129        tracing::trace!(
130            "[ENCODE] SubgroupHeader: encoded publisher_priority={}",
131            self.publisher_priority
132        );
133
134        let bytes_written = start_pos - w.remaining_mut();
135        tracing::debug!(
136            "[ENCODE] SubgroupHeader complete: wrote {} bytes",
137            bytes_written
138        );
139
140        Ok(())
141    }
142}
143
144// Subgroup Object without Extension headers (version with ExtensionHeaders is below)
145#[derive(Debug, Clone, Eq, PartialEq)]
146pub struct SubgroupObject {
147    pub object_id_delta: u64,
148    pub payload_length: usize,
149    pub status: Option<ObjectStatus>,
150    //pub payload: bytes::Bytes,  // TODO SLG - payload is sent outside this right now - decide which way to go
151}
152
153impl Decode for SubgroupObject {
154    fn decode<R: bytes::Buf>(r: &mut R) -> Result<Self, DecodeError> {
155        tracing::trace!(
156            "[DECODE] SubgroupObject: starting decode, buffer_remaining={} bytes",
157            r.remaining()
158        );
159
160        let object_id_delta = u64::decode(r)?;
161        tracing::trace!(
162            "[DECODE] SubgroupObject: object_id_delta={}",
163            object_id_delta
164        );
165
166        let payload_length = usize::decode(r)?;
167        tracing::trace!("[DECODE] SubgroupObject: payload_length={}", payload_length);
168
169        let status = match payload_length {
170            0 => {
171                let s = ObjectStatus::decode(r)?;
172                tracing::trace!("[DECODE] SubgroupObject: status={:?} (payload_length=0)", s);
173                Some(s)
174            }
175            _ => {
176                tracing::trace!("[DECODE] SubgroupObject: status=None (payload_length > 0)");
177                None
178            }
179        };
180
181        //Self::decode_remaining(r, payload_length);
182        //let payload = r.copy_to_bytes(payload_length);
183
184        tracing::debug!(
185            "[DECODE] SubgroupObject complete: object_id_delta={}, payload_length={}, status={:?}, buffer_remaining={} bytes",
186            object_id_delta,
187            payload_length,
188            status,
189            r.remaining()
190        );
191
192        Ok(Self {
193            object_id_delta,
194            payload_length,
195            status,
196            //payload,
197        })
198    }
199}
200
201impl Encode for SubgroupObject {
202    fn encode<W: bytes::BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
203        tracing::trace!(
204            "[ENCODE] SubgroupObject: starting encode - object_id_delta={}, payload_length={}, status={:?}",
205            self.object_id_delta,
206            self.payload_length,
207            self.status
208        );
209
210        self.object_id_delta.encode(w)?;
211        tracing::trace!(
212            "[ENCODE] SubgroupObject: encoded object_id_delta={}",
213            self.object_id_delta
214        );
215
216        self.payload_length.encode(w)?;
217        tracing::trace!(
218            "[ENCODE] SubgroupObject: encoded payload_length={}",
219            self.payload_length
220        );
221
222        if self.payload_length == 0 {
223            if let Some(status) = self.status {
224                status.encode(w)?;
225                tracing::trace!("[ENCODE] SubgroupObject: encoded status={:?}", status);
226            } else {
227                tracing::error!("[ENCODE] SubgroupObject: MISSING status for payload_length=0");
228                return Err(EncodeError::MissingField("Status".to_string()));
229            }
230        }
231        //Self::encode_remaining(w, self.payload.len())?;
232        //w.put_slice(&self.payload);
233
234        tracing::debug!("[ENCODE] SubgroupObject complete");
235
236        Ok(())
237    }
238}
239
240// Subgroup Object with Extension headers
241#[derive(Debug, Clone, Eq, PartialEq)]
242pub struct SubgroupObjectExt {
243    pub object_id_delta: u64,
244    pub extension_headers: ExtensionHeaders,
245    pub payload_length: usize,
246    pub status: Option<ObjectStatus>,
247    //pub payload: bytes::Bytes,  // TODO SLG - payload is sent outside this right now - decide which way to go
248}
249
250impl Decode for SubgroupObjectExt {
251    fn decode<R: bytes::Buf>(r: &mut R) -> Result<Self, DecodeError> {
252        tracing::trace!(
253            "[DECODE] SubgroupObjectExt: starting decode, buffer_remaining={} bytes",
254            r.remaining()
255        );
256
257        let object_id_delta = u64::decode(r)?;
258        tracing::trace!(
259            "[DECODE] SubgroupObjectExt: object_id_delta={}",
260            object_id_delta
261        );
262
263        let extension_headers = ExtensionHeaders::decode(r)?;
264        tracing::trace!(
265            "[DECODE] SubgroupObjectExt: extension_headers={:?}",
266            extension_headers
267        );
268
269        let payload_length = usize::decode(r)?;
270        tracing::trace!(
271            "[DECODE] SubgroupObjectExt: payload_length={}",
272            payload_length
273        );
274
275        let status = match payload_length {
276            0 => {
277                let s = ObjectStatus::decode(r)?;
278                tracing::trace!(
279                    "[DECODE] SubgroupObjectExt: status={:?} (payload_length=0)",
280                    s
281                );
282                Some(s)
283            }
284            _ => {
285                tracing::trace!("[DECODE] SubgroupObjectExt: status=None (payload_length > 0)");
286                None
287            }
288        };
289
290        //Self::decode_remaining(r, payload_length);
291        //let payload = r.copy_to_bytes(payload_length);
292
293        tracing::debug!(
294            "[DECODE] SubgroupObjectExt complete: object_id_delta={}, payload_length={}, status={:?}, buffer_remaining={} bytes",
295            object_id_delta,
296            payload_length,
297            status,
298            r.remaining()
299        );
300
301        Ok(Self {
302            object_id_delta,
303            extension_headers,
304            payload_length,
305            status,
306            //payload,
307        })
308    }
309}
310
311impl Encode for SubgroupObjectExt {
312    fn encode<W: bytes::BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
313        tracing::trace!(
314            "[ENCODE] SubgroupObjectExt: starting encode - object_id_delta={}, payload_length={}, status={:?}, extension_headers={:?}",
315            self.object_id_delta,
316            self.payload_length,
317            self.status,
318            self.extension_headers
319        );
320
321        self.object_id_delta.encode(w)?;
322        tracing::trace!(
323            "[ENCODE] SubgroupObjectExt: encoded object_id_delta={}",
324            self.object_id_delta
325        );
326
327        self.extension_headers.encode(w)?;
328        tracing::trace!("[ENCODE] SubgroupObjectExt: encoded extension_headers");
329
330        self.payload_length.encode(w)?;
331        tracing::trace!(
332            "[ENCODE] SubgroupObjectExt: encoded payload_length={}",
333            self.payload_length
334        );
335
336        if self.payload_length == 0 {
337            if let Some(status) = self.status {
338                status.encode(w)?;
339                tracing::trace!("[ENCODE] SubgroupObjectExt: encoded status={:?}", status);
340            } else {
341                tracing::error!("[ENCODE] SubgroupObjectExt: MISSING status for payload_length=0");
342                return Err(EncodeError::MissingField("Status".to_string()));
343            }
344        }
345        //Self::encode_remaining(w, self.payload.len())?;
346        //w.put_slice(&self.payload);
347
348        tracing::debug!("[ENCODE] SubgroupObjectExt complete");
349
350        Ok(())
351    }
352}
353
354// TODO SLG - add more unit tests
355#[cfg(test)]
356mod tests {
357    use super::*;
358    use bytes::BytesMut;
359
360    #[test]
361    fn encode_decode_object() {
362        let mut buf = BytesMut::new();
363
364        let msg = SubgroupObject {
365            object_id_delta: 0,
366            payload_length: 7,
367            status: None,
368        };
369        msg.encode(&mut buf).unwrap();
370        let decoded = SubgroupObject::decode(&mut buf).unwrap();
371        assert_eq!(decoded, msg);
372    }
373
374    #[test]
375    fn encode_decode_object_ext() {
376        let mut buf = BytesMut::new();
377
378        // One ExtensionHeader for testing
379        let mut ext_hdrs = ExtensionHeaders::new();
380        ext_hdrs.set_bytesvalue(123, vec![0x00, 0x01, 0x02, 0x03]);
381
382        let msg = SubgroupObjectExt {
383            object_id_delta: 0,
384            extension_headers: ext_hdrs,
385            payload_length: 7,
386            status: None,
387        };
388        msg.encode(&mut buf).unwrap();
389        let decoded = SubgroupObjectExt::decode(&mut buf).unwrap();
390        assert_eq!(decoded, msg);
391    }
392}