1use std::borrow::Cow;
2
3use crate::{
4 coding::{Decode, DecodeError, Encode},
5 ietf::{
6 namespace::{decode_namespace, encode_namespace},
7 GroupOrder, Location, Message, Parameters, RequestId, Version,
8 },
9 Path,
10};
11
12#[derive(Debug, Clone, PartialEq, Eq)]
13pub enum FetchType<'a> {
14 Standalone {
16 namespace: Path<'a>,
17 track: Cow<'a, str>,
18 start: Location,
19 end: Location,
20 },
21 RelativeJoining {
22 subscriber_request_id: RequestId,
23 group_offset: u64,
24 },
25 AbsoluteJoining {
26 subscriber_request_id: RequestId,
27 group_id: u64,
28 },
29}
30
31impl<'a, V: Copy> Encode<V> for FetchType<'a> {
32 fn encode<W: bytes::BufMut>(&self, w: &mut W, version: V) {
33 match self {
34 FetchType::Standalone {
35 namespace,
36 track,
37 start,
38 end,
39 } => {
40 1u8.encode(w, version);
41 encode_namespace(w, namespace, version);
42 track.encode(w, version);
43 start.encode(w, version);
44 end.encode(w, version);
45 }
46 FetchType::RelativeJoining {
47 subscriber_request_id,
48 group_offset,
49 } => {
50 2u8.encode(w, version);
51 subscriber_request_id.encode(w, version);
52 group_offset.encode(w, version);
53 }
54 FetchType::AbsoluteJoining {
55 subscriber_request_id,
56 group_id,
57 } => {
58 3u8.encode(w, version);
59 subscriber_request_id.encode(w, version);
60 group_id.encode(w, version);
61 }
62 }
63 }
64}
65
66impl<'a, V: Copy> Decode<V> for FetchType<'a> {
67 fn decode<B: bytes::Buf>(buf: &mut B, version: V) -> Result<Self, DecodeError> {
68 let fetch_type = u64::decode(buf, version)?;
69 Ok(match fetch_type {
70 0x1 => {
71 let namespace = decode_namespace(buf, version)?;
72 let track = Cow::<str>::decode(buf, version)?;
73 let start = Location::decode(buf, version)?;
74 let end = Location::decode(buf, version)?;
75 FetchType::Standalone {
76 namespace,
77 track,
78 start,
79 end,
80 }
81 }
82 0x2 => {
83 let subscriber_request_id = RequestId::decode(buf, version)?;
84 let group_offset = u64::decode(buf, version)?;
85 FetchType::RelativeJoining {
86 subscriber_request_id,
87 group_offset,
88 }
89 }
90 0x3 => {
91 let subscriber_request_id = RequestId::decode(buf, version)?;
92 let group_id = u64::decode(buf, version)?;
93 FetchType::AbsoluteJoining {
94 subscriber_request_id,
95 group_id,
96 }
97 }
98 _ => return Err(DecodeError::InvalidValue),
99 })
100 }
101}
102
103#[derive(Debug, Clone, PartialEq, Eq)]
104pub struct Fetch<'a> {
105 pub request_id: RequestId,
106 pub subscriber_priority: u8,
107 pub group_order: GroupOrder,
108 pub fetch_type: FetchType<'a>,
109 }
112
113impl<'a> Message for Fetch<'a> {
114 const ID: u64 = 0x16;
115
116 fn encode_msg<W: bytes::BufMut>(&self, w: &mut W, version: Version) {
117 self.request_id.encode(w, version);
118 self.subscriber_priority.encode(w, version);
119 self.group_order.encode(w, version);
120 self.fetch_type.encode(w, version);
121 0u8.encode(w, version);
123 }
124
125 fn decode_msg<B: bytes::Buf>(buf: &mut B, version: Version) -> Result<Self, DecodeError> {
126 let request_id = RequestId::decode(buf, version)?;
127 let subscriber_priority = u8::decode(buf, version)?;
128 let group_order = GroupOrder::decode(buf, version)?;
129 let fetch_type = FetchType::decode(buf, version)?;
130 let _params = Parameters::decode(buf, version)?;
132 Ok(Self {
133 request_id,
134 subscriber_priority,
135 group_order,
136 fetch_type,
137 })
138 }
139}
140
141#[derive(Debug, Clone, PartialEq, Eq)]
142pub struct FetchOk {
143 pub request_id: RequestId,
144 pub group_order: GroupOrder,
145 pub end_of_track: bool,
146 pub end_location: Location,
147 }
149impl Message for FetchOk {
150 const ID: u64 = 0x18;
151
152 fn encode_msg<W: bytes::BufMut>(&self, w: &mut W, version: Version) {
153 self.request_id.encode(w, version);
154 self.group_order.encode(w, version);
155 self.end_of_track.encode(w, version);
156 self.end_location.encode(w, version);
157 0u8.encode(w, version);
159 }
160
161 fn decode_msg<B: bytes::Buf>(buf: &mut B, version: Version) -> Result<Self, DecodeError> {
162 let request_id = RequestId::decode(buf, version)?;
163 let group_order = GroupOrder::decode(buf, version)?;
164 let end_of_track = bool::decode(buf, version)?;
165 let end_location = Location::decode(buf, version)?;
166 let _params = Parameters::decode(buf, version)?;
168 Ok(Self {
169 request_id,
170 group_order,
171 end_of_track,
172 end_location,
173 })
174 }
175}
176
177#[derive(Debug, Clone, PartialEq, Eq)]
178pub struct FetchError<'a> {
179 pub request_id: RequestId,
180 pub error_code: u64,
181 pub reason_phrase: Cow<'a, str>,
182}
183
184impl<'a> Message for FetchError<'a> {
185 const ID: u64 = 0x19;
186
187 fn encode_msg<W: bytes::BufMut>(&self, w: &mut W, version: Version) {
188 self.request_id.encode(w, version);
189 self.error_code.encode(w, version);
190 self.reason_phrase.encode(w, version);
191 }
192
193 fn decode_msg<B: bytes::Buf>(buf: &mut B, version: Version) -> Result<Self, DecodeError> {
194 let request_id = RequestId::decode(buf, version)?;
195 let error_code = u64::decode(buf, version)?;
196 let reason_phrase = Cow::<str>::decode(buf, version)?;
197 Ok(Self {
198 request_id,
199 error_code,
200 reason_phrase,
201 })
202 }
203}
204
205#[derive(Debug, Clone, PartialEq, Eq)]
206pub struct FetchCancel {
207 pub request_id: RequestId,
208}
209impl Message for FetchCancel {
210 const ID: u64 = 0x17;
211
212 fn encode_msg<W: bytes::BufMut>(&self, w: &mut W, version: Version) {
213 self.request_id.encode(w, version);
214 }
215
216 fn decode_msg<B: bytes::Buf>(buf: &mut B, version: Version) -> Result<Self, DecodeError> {
217 let request_id = RequestId::decode(buf, version)?;
218 Ok(Self { request_id })
219 }
220}
221
222#[derive(Debug, Clone, PartialEq, Eq)]
223pub struct FetchHeader {
224 pub request_id: RequestId,
225}
226
227impl FetchHeader {
228 pub const TYPE: u64 = 0x5;
229}
230
231impl<V> Encode<V> for FetchHeader {
232 fn encode<W: bytes::BufMut>(&self, w: &mut W, version: V) {
233 self.request_id.encode(w, version);
234 }
235}
236
237impl<V> Decode<V> for FetchHeader {
238 fn decode<B: bytes::Buf>(buf: &mut B, version: V) -> Result<Self, DecodeError> {
239 let request_id = RequestId::decode(buf, version)?;
240 Ok(Self { request_id })
241 }
242}
243
244pub struct FetchObject {
246 }