1use std::io::Write;
2
3use crate::{
4 codec::{decoder::read_u32, Decoder, Encoder},
5 commands::{
6 close::CloseRequest, consumer_update_request::ConsumerUpdateRequestCommand,
7 create_stream::CreateStreamCommand, create_super_stream::CreateSuperStreamCommand,
8 credit::CreditCommand, declare_publisher::DeclarePublisherCommand, delete::Delete,
9 delete_publisher::DeletePublisherCommand, delete_super_stream::DeleteSuperStreamCommand,
10 exchange_command_versions::ExchangeCommandVersionsRequest, heart_beat::HeartBeatCommand,
11 metadata::MetadataCommand, open::OpenCommand, peer_properties::PeerPropertiesCommand,
12 publish::PublishCommand, query_offset::QueryOffsetRequest,
13 query_publisher_sequence::QueryPublisherRequest,
14 sasl_authenticate::SaslAuthenticateCommand, sasl_handshake::SaslHandshakeCommand,
15 store_offset::StoreOffset, subscribe::SubscribeCommand,
16 superstream_partitions::SuperStreamPartitionsRequest,
17 superstream_route::SuperStreamRouteRequest, tune::TunesCommand,
18 unsubscribe::UnSubscribeCommand,
19 },
20 error::{DecodeError, EncodeError},
21 protocol::commands::*,
22 types::Header,
23};
24
25use byteorder::{BigEndian, WriteBytesExt};
26
27mod shims;
28#[derive(Debug, PartialEq, Eq)]
29pub struct Request {
30 header: Header,
31 kind: RequestKind,
32}
33
34impl Request {
35 pub fn kind(&self) -> &RequestKind {
37 &self.kind
38 }
39
40 pub fn header(&self) -> &Header {
42 &self.header
43 }
44}
45#[derive(Debug, PartialEq, Eq)]
46pub enum RequestKind {
47 PeerProperties(PeerPropertiesCommand),
48 SaslHandshake(SaslHandshakeCommand),
49 SaslAuthenticate(SaslAuthenticateCommand),
50 Tunes(TunesCommand),
51 Open(OpenCommand),
52 Close(CloseRequest),
53 Delete(Delete),
54 CreateStream(CreateStreamCommand),
55 Subscribe(SubscribeCommand),
56 Credit(CreditCommand),
57 Metadata(MetadataCommand),
58 DeclarePublisher(DeclarePublisherCommand),
59 DeletePublisher(DeletePublisherCommand),
60 Heartbeat(HeartBeatCommand),
61 Publish(PublishCommand),
62 QueryOffset(QueryOffsetRequest),
63 QueryPublisherSequence(QueryPublisherRequest),
64 StoreOffset(StoreOffset),
65 Unsubscribe(UnSubscribeCommand),
66 ExchangeCommandVersions(ExchangeCommandVersionsRequest),
67 CreateSuperStream(CreateSuperStreamCommand),
68 DeleteSuperStream(DeleteSuperStreamCommand),
69 SuperStreamPartitions(SuperStreamPartitionsRequest),
70 SuperStreamRoute(SuperStreamRouteRequest),
71 ConsumerUpdateRequest(ConsumerUpdateRequestCommand),
72}
73
74impl Encoder for RequestKind {
75 fn encoded_size(&self) -> u32 {
76 match self {
77 RequestKind::PeerProperties(peer) => peer.encoded_size(),
78 RequestKind::SaslHandshake(handshake) => handshake.encoded_size(),
79 RequestKind::SaslAuthenticate(authenticate) => authenticate.encoded_size(),
80 RequestKind::Tunes(tunes) => tunes.encoded_size(),
81 RequestKind::Open(open) => open.encoded_size(),
82 RequestKind::Delete(delete) => delete.encoded_size(),
83 RequestKind::CreateStream(create_stream) => create_stream.encoded_size(),
84 RequestKind::Subscribe(subscribe) => subscribe.encoded_size(),
85 RequestKind::Credit(credit) => credit.encoded_size(),
86 RequestKind::Metadata(metadata) => metadata.encoded_size(),
87 RequestKind::Close(close) => close.encoded_size(),
88 RequestKind::DeclarePublisher(declare_publisher) => declare_publisher.encoded_size(),
89 RequestKind::DeletePublisher(delete_publisher) => delete_publisher.encoded_size(),
90 RequestKind::Heartbeat(heartbeat) => heartbeat.encoded_size(),
91 RequestKind::Publish(publish) => publish.encoded_size(),
92 RequestKind::QueryOffset(query_offset) => query_offset.encoded_size(),
93 RequestKind::QueryPublisherSequence(query_publisher) => query_publisher.encoded_size(),
94 RequestKind::StoreOffset(store_offset) => store_offset.encoded_size(),
95 RequestKind::Unsubscribe(unsubscribe) => unsubscribe.encoded_size(),
96 RequestKind::ExchangeCommandVersions(exchange_command_versions) => {
97 exchange_command_versions.encoded_size()
98 }
99 RequestKind::CreateSuperStream(create_super_stream) => {
100 create_super_stream.encoded_size()
101 }
102 RequestKind::DeleteSuperStream(delete_super_stream) => {
103 delete_super_stream.encoded_size()
104 }
105 RequestKind::SuperStreamPartitions(super_stream_partitions) => {
106 super_stream_partitions.encoded_size()
107 }
108 RequestKind::SuperStreamRoute(super_stream_route) => super_stream_route.encoded_size(),
109 RequestKind::ConsumerUpdateRequest(consumer_update_request) => {
110 consumer_update_request.encoded_size()
111 }
112 }
113 }
114
115 fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
116 match self {
117 RequestKind::PeerProperties(peer) => peer.encode(writer),
118 RequestKind::SaslHandshake(handshake) => handshake.encode(writer),
119 RequestKind::SaslAuthenticate(authenticate) => authenticate.encode(writer),
120 RequestKind::Tunes(tunes) => tunes.encode(writer),
121 RequestKind::Open(open) => open.encode(writer),
122 RequestKind::Delete(delete) => delete.encode(writer),
123 RequestKind::CreateStream(create_stream) => create_stream.encode(writer),
124 RequestKind::Subscribe(subscribe) => subscribe.encode(writer),
125 RequestKind::Credit(credit) => credit.encode(writer),
126 RequestKind::Metadata(metadata) => metadata.encode(writer),
127 RequestKind::Close(close) => close.encode(writer),
128 RequestKind::DeclarePublisher(declare_publisher) => declare_publisher.encode(writer),
129 RequestKind::DeletePublisher(delete_publisher) => delete_publisher.encode(writer),
130 RequestKind::Heartbeat(heartbeat) => heartbeat.encode(writer),
131 RequestKind::Publish(publish) => publish.encode(writer),
132 RequestKind::QueryOffset(query_offset) => query_offset.encode(writer),
133 RequestKind::QueryPublisherSequence(query_publisher) => query_publisher.encode(writer),
134 RequestKind::StoreOffset(store_offset) => store_offset.encode(writer),
135 RequestKind::Unsubscribe(unsubcribe) => unsubcribe.encode(writer),
136 RequestKind::ExchangeCommandVersions(exchange_command_versions) => {
137 exchange_command_versions.encode(writer)
138 }
139 RequestKind::CreateSuperStream(create_super_stream) => {
140 create_super_stream.encode(writer)
141 }
142 RequestKind::DeleteSuperStream(delete_super_stream) => {
143 delete_super_stream.encode(writer)
144 }
145 RequestKind::SuperStreamPartitions(super_stream_partition) => {
146 super_stream_partition.encode(writer)
147 }
148 RequestKind::SuperStreamRoute(super_stream_route) => super_stream_route.encode(writer),
149 RequestKind::ConsumerUpdateRequest(consumer_update_request) => {
150 consumer_update_request.encode(writer)
151 }
152 }
153 }
154}
155impl Encoder for Request {
156 fn encoded_size(&self) -> u32 {
157 self.header.encoded_size() + self.kind.encoded_size()
158 }
159
160 fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
161 writer.write_u32::<BigEndian>(self.encoded_size())?;
162 self.header.encode(writer)?;
163 self.kind.encode(writer)?;
164 Ok(())
165 }
166}
167
168impl Decoder for Request {
169 fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
170 let (input, _) = read_u32(input)?;
171 let (input, header) = Header::decode(input)?;
172
173 let (input, cmd) = match header.key() {
174 COMMAND_OPEN => OpenCommand::decode(input).map(|(i, kind)| (i, kind.into()))?,
175 COMMAND_PEER_PROPERTIES => {
176 PeerPropertiesCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
177 }
178 COMMAND_SASL_HANDSHAKE => {
179 SaslHandshakeCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
180 }
181 COMMAND_SASL_AUTHENTICATE => {
182 SaslAuthenticateCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
183 }
184 COMMAND_TUNE => TunesCommand::decode(input).map(|(i, kind)| (i, kind.into()))?,
185 COMMAND_DELETE_STREAM => Delete::decode(input).map(|(i, kind)| (i, kind.into()))?,
186 COMMAND_CREATE_STREAM => {
187 CreateStreamCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
188 }
189 COMMAND_METADATA => MetadataCommand::decode(input).map(|(i, kind)| (i, kind.into()))?,
190 COMMAND_CLOSE => CloseRequest::decode(input).map(|(i, kind)| (i, kind.into()))?,
191 COMMAND_CREDIT => CreditCommand::decode(input).map(|(i, kind)| (i, kind.into()))?,
192 COMMAND_DECLARE_PUBLISHER => {
193 DeclarePublisherCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
194 }
195 COMMAND_DELETE_PUBLISHER => {
196 DeletePublisherCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
197 }
198
199 COMMAND_HEARTBEAT => {
200 HeartBeatCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
201 }
202 COMMAND_PUBLISH => PublishCommand::decode(input).map(|(i, kind)| (i, kind.into()))?,
203 COMMAND_QUERY_OFFSET => {
204 QueryOffsetRequest::decode(input).map(|(i, kind)| (i, kind.into()))?
205 }
206 COMMAND_QUERY_PUBLISHER_SEQUENCE => {
207 QueryPublisherRequest::decode(input).map(|(i, kind)| (i, kind.into()))?
208 }
209
210 COMMAND_STORE_OFFSET => StoreOffset::decode(input).map(|(i, kind)| (i, kind.into()))?,
211 COMMAND_SUBSCRIBE => {
212 SubscribeCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
213 }
214 COMMAND_UNSUBSCRIBE => {
215 UnSubscribeCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
216 }
217 COMMAND_EXCHANGE_COMMAND_VERSIONS => {
218 ExchangeCommandVersionsRequest::decode(input).map(|(i, kind)| (i, kind.into()))?
219 }
220 COMMAND_CREATE_SUPER_STREAM => {
221 CreateSuperStreamCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
222 }
223 COMMAND_DELETE_SUPER_STREAM => {
224 DeleteSuperStreamCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
225 }
226 COMMAND_PARTITIONS => {
227 SuperStreamPartitionsRequest::decode(input).map(|(i, kind)| (i, kind.into()))?
228 }
229 COMMAND_ROUTE => {
230 SuperStreamRouteRequest::decode(input).map(|(i, kind)| (i, kind.into()))?
231 }
232 COMMAND_CONSUMER_UPDATE_REQUEST => {
233 ConsumerUpdateRequestCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
234 }
235 n => return Err(DecodeError::UnsupportedResponseType(n)),
236 };
237 Ok((input, Request { header, kind: cmd }))
238 }
239}
240
241#[cfg(test)]
242mod tests {
243
244 use crate::{
245 codec::{Decoder, Encoder},
246 commands::{
247 close::CloseRequest, create_stream::CreateStreamCommand,
248 create_super_stream::CreateSuperStreamCommand, credit::CreditCommand,
249 declare_publisher::DeclarePublisherCommand, delete::Delete,
250 delete_publisher::DeletePublisherCommand,
251 delete_super_stream::DeleteSuperStreamCommand,
252 exchange_command_versions::ExchangeCommandVersionsRequest,
253 heart_beat::HeartBeatCommand, metadata::MetadataCommand, open::OpenCommand,
254 peer_properties::PeerPropertiesCommand, publish::PublishCommand,
255 query_offset::QueryOffsetRequest, query_publisher_sequence::QueryPublisherRequest,
256 sasl_authenticate::SaslAuthenticateCommand, sasl_handshake::SaslHandshakeCommand,
257 store_offset::StoreOffset, subscribe::SubscribeCommand,
258 superstream_partitions::SuperStreamPartitionsRequest,
259 superstream_route::SuperStreamRouteRequest, tune::TunesCommand,
260 unsubscribe::UnSubscribeCommand, Command,
261 },
262 };
263
264 use std::fmt::Debug;
265
266 use super::Request;
267 use fake::{Dummy, Fake, Faker};
268
269 #[test]
270 fn request_open_test() {
271 request_encode_decode_test::<OpenCommand>()
272 }
273
274 #[test]
275 fn request_peer_properties_test() {
276 request_encode_decode_test::<PeerPropertiesCommand>()
277 }
278
279 #[test]
280 fn request_create_stream_test() {
281 request_encode_decode_test::<CreateStreamCommand>()
282 }
283
284 #[test]
285 fn request_delete_stream_test() {
286 request_encode_decode_test::<Delete>()
287 }
288
289 #[test]
290 fn request_sasl_authenticate_test() {
291 request_encode_decode_test::<SaslAuthenticateCommand>()
292 }
293
294 #[test]
295 fn request_sasl_handshake_test() {
296 request_encode_decode_test::<SaslHandshakeCommand>()
297 }
298
299 #[test]
300 fn request_tune_test() {
301 request_encode_decode_test::<TunesCommand>()
302 }
303
304 #[test]
305 fn request_metadata_test() {
306 request_encode_decode_test::<MetadataCommand>()
307 }
308 #[test]
309 fn request_close_test() {
310 request_encode_decode_test::<CloseRequest>()
311 }
312
313 #[test]
314 fn request_credit_test() {
315 request_encode_decode_test::<CreditCommand>()
316 }
317
318 #[test]
319 fn request_declare_publisher_test() {
320 request_encode_decode_test::<DeclarePublisherCommand>()
321 }
322 #[test]
323 fn request_delete_publisher_test() {
324 request_encode_decode_test::<DeletePublisherCommand>()
325 }
326 #[test]
327 fn request_heartbeat_test() {
328 request_encode_decode_test::<HeartBeatCommand>()
329 }
330
331 #[test]
332 fn request_publish_test() {
333 request_encode_decode_test::<PublishCommand>()
334 }
335 #[test]
336 fn request_query_offset_test() {
337 request_encode_decode_test::<QueryOffsetRequest>()
338 }
339 #[test]
340 fn request_query_publisher_test() {
341 request_encode_decode_test::<QueryPublisherRequest>()
342 }
343
344 #[test]
345 fn request_store_offset_test() {
346 request_encode_decode_test::<StoreOffset>()
347 }
348 #[test]
349 fn request_subscribe_test() {
350 request_encode_decode_test::<SubscribeCommand>()
351 }
352 #[test]
353 fn request_unsubscribe_test() {
354 request_encode_decode_test::<UnSubscribeCommand>()
355 }
356 fn request_encode_decode_test<T>()
357 where
358 T: Dummy<Faker> + Encoder + Decoder + Debug + PartialEq + Command + Into<Request>,
359 {
360 let command: T = Faker.fake();
361
362 let request: Request = command.into();
363
364 let mut buffer = vec![];
365 let _ = request.encode(&mut buffer).unwrap();
366
367 let (remaining, decoded) = Request::decode(&buffer).unwrap();
368
369 assert_eq!(buffer[4..].len(), decoded.encoded_size() as usize);
370
371 assert_eq!(request, decoded);
372
373 assert!(remaining.is_empty());
374 }
375
376 #[test]
377 fn request_exchange_command_versions_test() {
378 request_encode_decode_test::<ExchangeCommandVersionsRequest>()
379 }
380
381 #[test]
382 fn request_create_super_stream_test() {
383 request_encode_decode_test::<CreateSuperStreamCommand>()
384 }
385
386 #[test]
387 fn request_delete_super_stream_test() {
388 request_encode_decode_test::<DeleteSuperStreamCommand>()
389 }
390
391 #[test]
392 fn request_partitions_command() {
393 request_encode_decode_test::<SuperStreamPartitionsRequest>()
394 }
395
396 #[test]
397 fn request_route_command() {
398 request_encode_decode_test::<SuperStreamRouteRequest>()
399 }
400}