Skip to main content

moq_transport/message/
fetch.rs

1use crate::coding::{
2    Decode, DecodeError, Encode, EncodeError, KeyValuePairs, Location, TrackNamespace,
3};
4use crate::message::{FetchType, GroupOrder};
5
6#[derive(Clone, Debug, Eq, PartialEq)]
7pub struct StandaloneFetch {
8    pub track_namespace: TrackNamespace,
9    pub track_name: String,
10    pub start_location: Location,
11    pub end_location: Location,
12}
13
14impl Decode for StandaloneFetch {
15    fn decode<R: bytes::Buf>(r: &mut R) -> Result<Self, DecodeError> {
16        let track_namespace = TrackNamespace::decode(r)?;
17        let track_name = String::decode(r)?;
18        let start_location = Location::decode(r)?;
19        let end_location = Location::decode(r)?;
20
21        Ok(Self {
22            track_namespace,
23            track_name,
24            start_location,
25            end_location,
26        })
27    }
28}
29
30impl Encode for StandaloneFetch {
31    fn encode<W: bytes::BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
32        self.track_namespace.encode(w)?;
33        self.track_name.encode(w)?;
34        self.start_location.encode(w)?;
35        self.end_location.encode(w)?;
36
37        Ok(())
38    }
39}
40
41#[derive(Clone, Debug, Eq, PartialEq)]
42pub struct JoiningFetch {
43    /// The request ID of the existing subscription to be joined.
44    pub joining_request_id: u64,
45    pub joining_start: u64,
46}
47
48impl Decode for JoiningFetch {
49    fn decode<R: bytes::Buf>(r: &mut R) -> Result<Self, DecodeError> {
50        let joining_request_id = u64::decode(r)?;
51        let joining_start = u64::decode(r)?;
52
53        Ok(Self {
54            joining_request_id,
55            joining_start,
56        })
57    }
58}
59
60impl Encode for JoiningFetch {
61    fn encode<W: bytes::BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
62        self.joining_request_id.encode(w)?;
63        self.joining_start.encode(w)?;
64
65        Ok(())
66    }
67}
68
69/// Sent by the subscriber to request to request a range
70/// of already published objects within a track.
71#[derive(Clone, Debug, Eq, PartialEq)]
72pub struct Fetch {
73    /// The fetch request ID
74    pub id: u64,
75
76    /// Subscriber Priority
77    pub subscriber_priority: u8,
78
79    /// Object delivery order
80    pub group_order: GroupOrder,
81
82    /// Standalone fetch vs Relative Joining fetch vs Absolute Joining fetch
83    pub fetch_type: FetchType,
84
85    /// Track properties for Standalone fetch
86    pub standalone_fetch: Option<StandaloneFetch>,
87
88    /// Joining properties for Relative Joining or Absolute Joining fetches.
89    pub joining_fetch: Option<JoiningFetch>,
90
91    /// Optional parameters
92    pub params: KeyValuePairs,
93}
94
95impl Decode for Fetch {
96    fn decode<R: bytes::Buf>(r: &mut R) -> Result<Self, DecodeError> {
97        let id = u64::decode(r)?;
98
99        let subscriber_priority = u8::decode(r)?;
100        let group_order = GroupOrder::decode(r)?;
101
102        let fetch_type = FetchType::decode(r)?;
103
104        let standalone_fetch: Option<StandaloneFetch>;
105        let joining_fetch: Option<JoiningFetch>;
106        match fetch_type {
107            FetchType::Standalone => {
108                standalone_fetch = Some(StandaloneFetch::decode(r)?);
109                joining_fetch = None;
110            }
111            FetchType::RelativeJoining | FetchType::AbsoluteJoining => {
112                standalone_fetch = None;
113                joining_fetch = Some(JoiningFetch::decode(r)?);
114            }
115        };
116
117        let params = KeyValuePairs::decode(r)?;
118
119        Ok(Self {
120            id,
121            subscriber_priority,
122            group_order,
123            fetch_type,
124            standalone_fetch,
125            joining_fetch,
126            params,
127        })
128    }
129}
130
131impl Encode for Fetch {
132    fn encode<W: bytes::BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
133        self.id.encode(w)?;
134
135        self.subscriber_priority.encode(w)?;
136        self.group_order.encode(w)?;
137
138        self.fetch_type.encode(w)?;
139
140        match self.fetch_type {
141            FetchType::Standalone => {
142                if let Some(standalone_fetch) = &self.standalone_fetch {
143                    standalone_fetch.encode(w)?;
144                } else {
145                    return Err(EncodeError::MissingField(
146                        "StandaloneFetch info".to_string(),
147                    ));
148                }
149            }
150            FetchType::RelativeJoining | FetchType::AbsoluteJoining => {
151                if let Some(joining_fetch) = &self.joining_fetch {
152                    joining_fetch.encode(w)?;
153                } else {
154                    return Err(EncodeError::MissingField("JoiningFetch info".to_string()));
155                }
156            }
157        };
158
159        self.params.encode(w)?;
160
161        Ok(())
162    }
163}
164
165#[cfg(test)]
166mod tests {
167    use super::*;
168    use bytes::BytesMut;
169
170    #[test]
171    fn encode_decode() {
172        let mut buf = BytesMut::new();
173
174        // One parameter for testing
175        let mut kvps = KeyValuePairs::new();
176        kvps.set_bytesvalue(123, vec![0x00, 0x01, 0x02, 0x03]);
177
178        // FetchType = Standlone
179        let msg = Fetch {
180            id: 12345,
181            subscriber_priority: 127,
182            group_order: GroupOrder::Publisher,
183            fetch_type: FetchType::Standalone,
184            standalone_fetch: Some(StandaloneFetch {
185                track_namespace: TrackNamespace::from_utf8_path("test/path/to/resource"),
186                track_name: "audiotrack".to_string(),
187                start_location: Location::new(34, 53),
188                end_location: Location::new(34, 53),
189            }),
190            joining_fetch: None,
191            params: kvps.clone(),
192        };
193        msg.encode(&mut buf).unwrap();
194        let decoded = Fetch::decode(&mut buf).unwrap();
195        assert_eq!(decoded, msg);
196
197        // FetchType = RelativeJoining
198        let msg = Fetch {
199            id: 12345,
200            subscriber_priority: 127,
201            group_order: GroupOrder::Publisher,
202            fetch_type: FetchType::RelativeJoining,
203            standalone_fetch: None,
204            joining_fetch: Some(JoiningFetch {
205                joining_request_id: 382,
206                joining_start: 3463,
207            }),
208            params: kvps.clone(),
209        };
210        msg.encode(&mut buf).unwrap();
211        let decoded = Fetch::decode(&mut buf).unwrap();
212        assert_eq!(decoded, msg);
213
214        // FetchType = AbsoluteJoining
215        let msg = Fetch {
216            id: 12345,
217            subscriber_priority: 127,
218            group_order: GroupOrder::Publisher,
219            fetch_type: FetchType::AbsoluteJoining,
220            standalone_fetch: None,
221            joining_fetch: Some(JoiningFetch {
222                joining_request_id: 382,
223                joining_start: 3463,
224            }),
225            params: kvps.clone(),
226        };
227        msg.encode(&mut buf).unwrap();
228        let decoded = Fetch::decode(&mut buf).unwrap();
229        assert_eq!(decoded, msg);
230    }
231
232    #[test]
233    fn encode_missing_fields() {
234        let mut buf = BytesMut::new();
235
236        // FetchType = Standlone - missing standalone_fetch
237        let msg = Fetch {
238            id: 12345,
239            subscriber_priority: 127,
240            group_order: GroupOrder::Publisher,
241            fetch_type: FetchType::Standalone,
242            standalone_fetch: None,
243            joining_fetch: None,
244            params: Default::default(),
245        };
246        let encoded = msg.encode(&mut buf);
247        assert!(matches!(encoded.unwrap_err(), EncodeError::MissingField(_)));
248
249        // FetchType = AbsoluteJoining - missing joining_fetch
250        let msg = Fetch {
251            id: 12345,
252            subscriber_priority: 127,
253            group_order: GroupOrder::Publisher,
254            fetch_type: FetchType::AbsoluteJoining,
255            standalone_fetch: None,
256            joining_fetch: None,
257            params: Default::default(),
258        };
259        let encoded = msg.encode(&mut buf);
260        assert!(matches!(encoded.unwrap_err(), EncodeError::MissingField(_)));
261    }
262}