1use crate::coding::{
2 Decode, DecodeError, Encode, EncodeError, KeyValuePairs, Location, TrackNamespace,
3};
4use crate::message::{FetchType, GroupOrder};
5
6#[derive(Clone, Debug, Eq, PartialEq)]
7pub struct StandaloneFetch {
8 pub track_namespace: TrackNamespace,
9 pub track_name: String,
10 pub start_location: Location,
11 pub end_location: Location,
12}
13
14impl Decode for StandaloneFetch {
15 fn decode<R: bytes::Buf>(r: &mut R) -> Result<Self, DecodeError> {
16 let track_namespace = TrackNamespace::decode(r)?;
17 let track_name = String::decode(r)?;
18 let start_location = Location::decode(r)?;
19 let end_location = Location::decode(r)?;
20
21 Ok(Self {
22 track_namespace,
23 track_name,
24 start_location,
25 end_location,
26 })
27 }
28}
29
30impl Encode for StandaloneFetch {
31 fn encode<W: bytes::BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
32 self.track_namespace.encode(w)?;
33 self.track_name.encode(w)?;
34 self.start_location.encode(w)?;
35 self.end_location.encode(w)?;
36
37 Ok(())
38 }
39}
40
41#[derive(Clone, Debug, Eq, PartialEq)]
42pub struct JoiningFetch {
43 pub joining_request_id: u64,
45 pub joining_start: u64,
46}
47
48impl Decode for JoiningFetch {
49 fn decode<R: bytes::Buf>(r: &mut R) -> Result<Self, DecodeError> {
50 let joining_request_id = u64::decode(r)?;
51 let joining_start = u64::decode(r)?;
52
53 Ok(Self {
54 joining_request_id,
55 joining_start,
56 })
57 }
58}
59
60impl Encode for JoiningFetch {
61 fn encode<W: bytes::BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
62 self.joining_request_id.encode(w)?;
63 self.joining_start.encode(w)?;
64
65 Ok(())
66 }
67}
68
69#[derive(Clone, Debug, Eq, PartialEq)]
72pub struct Fetch {
73 pub id: u64,
75
76 pub subscriber_priority: u8,
78
79 pub group_order: GroupOrder,
81
82 pub fetch_type: FetchType,
84
85 pub standalone_fetch: Option<StandaloneFetch>,
87
88 pub joining_fetch: Option<JoiningFetch>,
90
91 pub params: KeyValuePairs,
93}
94
95impl Decode for Fetch {
96 fn decode<R: bytes::Buf>(r: &mut R) -> Result<Self, DecodeError> {
97 let id = u64::decode(r)?;
98
99 let subscriber_priority = u8::decode(r)?;
100 let group_order = GroupOrder::decode(r)?;
101
102 let fetch_type = FetchType::decode(r)?;
103
104 let standalone_fetch: Option<StandaloneFetch>;
105 let joining_fetch: Option<JoiningFetch>;
106 match fetch_type {
107 FetchType::Standalone => {
108 standalone_fetch = Some(StandaloneFetch::decode(r)?);
109 joining_fetch = None;
110 }
111 FetchType::RelativeJoining | FetchType::AbsoluteJoining => {
112 standalone_fetch = None;
113 joining_fetch = Some(JoiningFetch::decode(r)?);
114 }
115 };
116
117 let params = KeyValuePairs::decode(r)?;
118
119 Ok(Self {
120 id,
121 subscriber_priority,
122 group_order,
123 fetch_type,
124 standalone_fetch,
125 joining_fetch,
126 params,
127 })
128 }
129}
130
131impl Encode for Fetch {
132 fn encode<W: bytes::BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
133 self.id.encode(w)?;
134
135 self.subscriber_priority.encode(w)?;
136 self.group_order.encode(w)?;
137
138 self.fetch_type.encode(w)?;
139
140 match self.fetch_type {
141 FetchType::Standalone => {
142 if let Some(standalone_fetch) = &self.standalone_fetch {
143 standalone_fetch.encode(w)?;
144 } else {
145 return Err(EncodeError::MissingField(
146 "StandaloneFetch info".to_string(),
147 ));
148 }
149 }
150 FetchType::RelativeJoining | FetchType::AbsoluteJoining => {
151 if let Some(joining_fetch) = &self.joining_fetch {
152 joining_fetch.encode(w)?;
153 } else {
154 return Err(EncodeError::MissingField("JoiningFetch info".to_string()));
155 }
156 }
157 };
158
159 self.params.encode(w)?;
160
161 Ok(())
162 }
163}
164
165#[cfg(test)]
166mod tests {
167 use super::*;
168 use bytes::BytesMut;
169
170 #[test]
171 fn encode_decode() {
172 let mut buf = BytesMut::new();
173
174 let mut kvps = KeyValuePairs::new();
176 kvps.set_bytesvalue(123, vec![0x00, 0x01, 0x02, 0x03]);
177
178 let msg = Fetch {
180 id: 12345,
181 subscriber_priority: 127,
182 group_order: GroupOrder::Publisher,
183 fetch_type: FetchType::Standalone,
184 standalone_fetch: Some(StandaloneFetch {
185 track_namespace: TrackNamespace::from_utf8_path("test/path/to/resource"),
186 track_name: "audiotrack".to_string(),
187 start_location: Location::new(34, 53),
188 end_location: Location::new(34, 53),
189 }),
190 joining_fetch: None,
191 params: kvps.clone(),
192 };
193 msg.encode(&mut buf).unwrap();
194 let decoded = Fetch::decode(&mut buf).unwrap();
195 assert_eq!(decoded, msg);
196
197 let msg = Fetch {
199 id: 12345,
200 subscriber_priority: 127,
201 group_order: GroupOrder::Publisher,
202 fetch_type: FetchType::RelativeJoining,
203 standalone_fetch: None,
204 joining_fetch: Some(JoiningFetch {
205 joining_request_id: 382,
206 joining_start: 3463,
207 }),
208 params: kvps.clone(),
209 };
210 msg.encode(&mut buf).unwrap();
211 let decoded = Fetch::decode(&mut buf).unwrap();
212 assert_eq!(decoded, msg);
213
214 let msg = Fetch {
216 id: 12345,
217 subscriber_priority: 127,
218 group_order: GroupOrder::Publisher,
219 fetch_type: FetchType::AbsoluteJoining,
220 standalone_fetch: None,
221 joining_fetch: Some(JoiningFetch {
222 joining_request_id: 382,
223 joining_start: 3463,
224 }),
225 params: kvps.clone(),
226 };
227 msg.encode(&mut buf).unwrap();
228 let decoded = Fetch::decode(&mut buf).unwrap();
229 assert_eq!(decoded, msg);
230 }
231
232 #[test]
233 fn encode_missing_fields() {
234 let mut buf = BytesMut::new();
235
236 let msg = Fetch {
238 id: 12345,
239 subscriber_priority: 127,
240 group_order: GroupOrder::Publisher,
241 fetch_type: FetchType::Standalone,
242 standalone_fetch: None,
243 joining_fetch: None,
244 params: Default::default(),
245 };
246 let encoded = msg.encode(&mut buf);
247 assert!(matches!(encoded.unwrap_err(), EncodeError::MissingField(_)));
248
249 let msg = Fetch {
251 id: 12345,
252 subscriber_priority: 127,
253 group_order: GroupOrder::Publisher,
254 fetch_type: FetchType::AbsoluteJoining,
255 standalone_fetch: None,
256 joining_fetch: None,
257 params: Default::default(),
258 };
259 let encoded = msg.encode(&mut buf);
260 assert!(matches!(encoded.unwrap_err(), EncodeError::MissingField(_)));
261 }
262}