Skip to main content

moq_transport/message/
fetch_ok.rs

1use crate::coding::{Decode, DecodeError, Encode, EncodeError, KeyValuePairs, Location};
2use crate::message::GroupOrder;
3
4/// A publisher sends a FETCH_OK control message in response to successful fetches.
5#[derive(Clone, Debug, Eq, PartialEq)]
6pub struct FetchOk {
7    /// The Fetch request ID of the Fetch this message is replying to.
8    pub id: u64,
9
10    /// Order groups will be delivered in
11    pub group_order: GroupOrder,
12
13    /// True if all objects have been published on this track
14    pub end_of_track: bool,
15
16    /// The largest object covered by the fetch response
17    pub end_location: Location,
18
19    /// Optional parameters
20    pub params: KeyValuePairs,
21}
22
23impl Decode for FetchOk {
24    fn decode<R: bytes::Buf>(r: &mut R) -> Result<Self, DecodeError> {
25        let id = u64::decode(r)?;
26
27        let group_order = GroupOrder::decode(r)?;
28        // GroupOrder enum has Publisher in it, but it's not allowed to be used in this
29        // FetchOk message, so validate it now so we can return a protocol error.
30        if group_order == GroupOrder::Publisher {
31            return Err(DecodeError::InvalidGroupOrder);
32        }
33        let end_of_track = bool::decode(r)?;
34        let end_location = Location::decode(r)?;
35        let params = KeyValuePairs::decode(r)?;
36
37        Ok(Self {
38            id,
39            group_order,
40            end_of_track,
41            end_location,
42            params,
43        })
44    }
45}
46
47impl Encode for FetchOk {
48    fn encode<W: bytes::BufMut>(&self, w: &mut W) -> Result<(), EncodeError> {
49        self.id.encode(w)?;
50
51        // GroupOrder enum has Publisher in it, but it's not allowed to be used in this
52        // FetchOk message.
53        if self.group_order == GroupOrder::Publisher {
54            return Err(EncodeError::InvalidValue);
55        }
56        self.group_order.encode(w)?;
57        self.end_of_track.encode(w)?;
58        self.end_location.encode(w)?;
59        self.params.encode(w)?;
60
61        Ok(())
62    }
63}
64
65#[cfg(test)]
66mod tests {
67    use super::*;
68    use bytes::BytesMut;
69
70    #[test]
71    fn encode_decode() {
72        let mut buf = BytesMut::new();
73
74        // One parameter for testing
75        let mut kvps = KeyValuePairs::new();
76        kvps.set_bytesvalue(123, vec![0x00, 0x01, 0x02, 0x03]);
77
78        let msg = FetchOk {
79            id: 12345,
80            group_order: GroupOrder::Descending,
81            end_of_track: true,
82            end_location: Location::new(2, 3),
83            params: kvps.clone(),
84        };
85        msg.encode(&mut buf).unwrap();
86        let decoded = FetchOk::decode(&mut buf).unwrap();
87        assert_eq!(decoded, msg);
88    }
89
90    #[test]
91    fn encode_bad_group_order() {
92        let mut buf = BytesMut::new();
93
94        let msg = FetchOk {
95            id: 12345,
96            group_order: GroupOrder::Publisher,
97            end_of_track: true,
98            end_location: Location::new(2, 3),
99            params: Default::default(),
100        };
101        let encoded = msg.encode(&mut buf);
102        assert!(matches!(encoded.unwrap_err(), EncodeError::InvalidValue));
103    }
104}