1use std::convert::TryInto;
2
3use crate::{
4 codec::{
5 decoder::{read_u16, read_u32},
6 Decoder,
7 },
8 commands::{
9 close::CloseResponse, consumer_update::ConsumerUpdateCommand,
10 consumer_update_request::ConsumerUpdateRequestCommand, credit::CreditResponse,
11 deliver::DeliverCommand, exchange_command_versions::ExchangeCommandVersionsResponse,
12 generic::GenericResponse, heart_beat::HeartbeatResponse, metadata::MetadataResponse,
13 metadata_update::MetadataUpdateCommand, open::OpenResponse,
14 peer_properties::PeerPropertiesResponse, publish_confirm::PublishConfirm,
15 publish_error::PublishErrorResponse, query_offset::QueryOffsetResponse,
16 query_publisher_sequence::QueryPublisherResponse, sasl_handshake::SaslHandshakeResponse,
17 superstream_partitions::SuperStreamPartitionsResponse,
18 superstream_route::SuperStreamRouteResponse, tune::TunesCommand,
19 },
20 error::DecodeError,
21 protocol::commands::*,
22 types::Header,
23};
24
25mod shims;
26
27#[cfg_attr(test, derive(fake::Dummy))]
28#[derive(Debug, PartialEq, Eq, Clone)]
29pub enum ResponseCode {
30 Ok,
31 StreamDoesNotExist,
32 SubscriptionIdAlreadyExists,
33 SubscriptionIdDoesNotExist,
34 StreamAlreadyExists,
35 StreamNotAvailable,
36 SaslMechanismNotSupported,
37 AuthenticationFailure,
38 SaslError,
39 SaslChallange,
40 AuthenticationFailureLoopback,
41 VirtualHostAccessFailure,
42 UnknownFrame,
43 FrameTooLarge,
44 InternalError,
45 AccessRefused,
46 PrecoditionFailed,
47 PublisherDoesNotExist,
48 OffsetNotFound,
49}
50#[derive(Debug, PartialEq, Eq)]
51pub struct Response {
52 header: Header,
53 pub(crate) kind: ResponseKind,
54}
55
56#[derive(Debug, PartialEq, Eq)]
57pub enum ResponseKind {
58 Open(OpenResponse),
59 Close(CloseResponse),
60 PeerProperties(PeerPropertiesResponse),
61 SaslHandshake(SaslHandshakeResponse),
62 Generic(GenericResponse),
63 Tunes(TunesCommand),
64 Deliver(DeliverCommand),
65 Heartbeat(HeartbeatResponse),
66 Metadata(MetadataResponse),
67 MetadataUpdate(MetadataUpdateCommand),
68 PublishConfirm(PublishConfirm),
69 PublishError(PublishErrorResponse),
70 QueryOffset(QueryOffsetResponse),
71 QueryPublisherSequence(QueryPublisherResponse),
72 Credit(CreditResponse),
73 ExchangeCommandVersions(ExchangeCommandVersionsResponse),
74 SuperStreamPartitions(SuperStreamPartitionsResponse),
75 SuperStreamRoute(SuperStreamRouteResponse),
76 ConsumerUpdate(ConsumerUpdateCommand),
77 ConsumerUpdateRequest(ConsumerUpdateRequestCommand),
78}
79
80impl Response {
81 pub fn new(header: Header, kind: ResponseKind) -> Self {
82 Self { header, kind }
83 }
84
85 pub fn correlation_id(&self) -> Option<u32> {
86 match &self.kind {
87 ResponseKind::Open(open) => Some(open.correlation_id),
88 ResponseKind::Close(close) => Some(close.correlation_id),
89 ResponseKind::PeerProperties(peer_properties) => Some(peer_properties.correlation_id),
90 ResponseKind::SaslHandshake(handshake) => Some(handshake.correlation_id),
91 ResponseKind::Generic(generic) => Some(generic.correlation_id),
92 ResponseKind::Metadata(metadata) => Some(metadata.correlation_id),
93 ResponseKind::QueryOffset(query_offset) => Some(query_offset.correlation_id),
94 ResponseKind::QueryPublisherSequence(query_publisher) => {
95 Some(query_publisher.correlation_id)
96 }
97 ResponseKind::MetadataUpdate(_) => None,
98 ResponseKind::PublishConfirm(_) => None,
99 ResponseKind::PublishError(_) => None,
100 ResponseKind::Tunes(_) => None,
101 ResponseKind::Heartbeat(_) => None,
102 ResponseKind::Deliver(_) => None,
103 ResponseKind::Credit(_) => None,
104 ResponseKind::ExchangeCommandVersions(exchange_command_versions) => {
105 Some(exchange_command_versions.correlation_id)
106 }
107 ResponseKind::SuperStreamPartitions(super_stream_partitions_command) => {
108 Some(super_stream_partitions_command.correlation_id)
109 }
110 ResponseKind::SuperStreamRoute(super_stream_route_command) => {
111 Some(super_stream_route_command.correlation_id)
112 }
113 ResponseKind::ConsumerUpdate(consumer_update_command) => {
114 Some(consumer_update_command.correlation_id)
115 }
116 ResponseKind::ConsumerUpdateRequest(consumer_update_request_command) => {
117 Some(consumer_update_request_command.correlation_id)
118 }
119 }
120 }
121
122 pub fn get<T>(self) -> Option<T>
123 where
124 T: FromResponse,
125 {
126 T::from_response(self)
127 }
128
129 pub fn kind_ref(&self) -> &ResponseKind {
130 &self.kind
131 }
132 pub fn kind(self) -> ResponseKind {
133 self.kind
134 }
135}
136
137impl Decoder for Response {
138 fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
139 let (input, _) = read_u32(input)?;
140
141 let (input, header) = Header::decode(input)?;
142 let (input, kind) = match header.key() {
143 COMMAND_OPEN => {
144 OpenResponse::decode(input).map(|(i, kind)| (i, ResponseKind::Open(kind)))?
145 }
146
147 COMMAND_CLOSE => {
148 CloseResponse::decode(input).map(|(i, kind)| (i, ResponseKind::Close(kind)))?
149 }
150 COMMAND_PEER_PROPERTIES => PeerPropertiesResponse::decode(input)
151 .map(|(i, kind)| (i, ResponseKind::PeerProperties(kind)))?,
152 COMMAND_SASL_HANDSHAKE => SaslHandshakeResponse::decode(input)
153 .map(|(i, kind)| (i, ResponseKind::SaslHandshake(kind)))?,
154
155 COMMAND_DECLARE_PUBLISHER
156 | COMMAND_DELETE_PUBLISHER
157 | COMMAND_SASL_AUTHENTICATE
158 | COMMAND_SUBSCRIBE
159 | COMMAND_UNSUBSCRIBE
160 | COMMAND_CREATE_STREAM
161 | COMMAND_CREATE_SUPER_STREAM
162 | COMMAND_DELETE_SUPER_STREAM
163 | COMMAND_CONSUMER_UPDATE_REQUEST
164 | COMMAND_DELETE_STREAM => {
165 GenericResponse::decode(input).map(|(i, kind)| (i, ResponseKind::Generic(kind)))?
166 }
167 COMMAND_TUNE => {
168 TunesCommand::decode(input).map(|(i, kind)| (i, ResponseKind::Tunes(kind)))?
169 }
170 COMMAND_DELIVER => DeliverCommand::decode(input)
171 .map(|(remaining, kind)| (remaining, ResponseKind::Deliver(kind)))?,
172
173 COMMAND_HEARTBEAT => HeartbeatResponse::decode(input)
174 .map(|(remaining, kind)| (remaining, ResponseKind::Heartbeat(kind)))?,
175 COMMAND_METADATA => MetadataResponse::decode(input)
176 .map(|(remaining, kind)| (remaining, ResponseKind::Metadata(kind)))?,
177 COMMAND_METADATA_UPDATE => MetadataUpdateCommand::decode(input)
178 .map(|(remaining, kind)| (remaining, ResponseKind::MetadataUpdate(kind)))?,
179 COMMAND_PUBLISH_CONFIRM => PublishConfirm::decode(input)
180 .map(|(remaining, kind)| (remaining, ResponseKind::PublishConfirm(kind)))?,
181
182 COMMAND_PUBLISH_ERROR => PublishErrorResponse::decode(input)
183 .map(|(remaining, kind)| (remaining, ResponseKind::PublishError(kind)))?,
184
185 COMMAND_QUERY_OFFSET => QueryOffsetResponse::decode(input)
186 .map(|(remaining, kind)| (remaining, ResponseKind::QueryOffset(kind)))?,
187
188 COMMAND_CREDIT => CreditResponse::decode(input)
189 .map(|(remaining, kind)| (remaining, ResponseKind::Credit(kind)))?,
190
191 COMMAND_QUERY_PUBLISHER_SEQUENCE => QueryPublisherResponse::decode(input)
192 .map(|(remaining, kind)| (remaining, ResponseKind::QueryPublisherSequence(kind)))?,
193 COMMAND_EXCHANGE_COMMAND_VERSIONS => ExchangeCommandVersionsResponse::decode(input)
194 .map(|(remaining, kind)| {
195 (remaining, ResponseKind::ExchangeCommandVersions(kind))
196 })?,
197 COMMAND_PARTITIONS => SuperStreamPartitionsResponse::decode(input)
198 .map(|(remaining, kind)| (remaining, ResponseKind::SuperStreamPartitions(kind)))?,
199 COMMAND_ROUTE => SuperStreamRouteResponse::decode(input)
200 .map(|(remaining, kind)| (remaining, ResponseKind::SuperStreamRoute(kind)))?,
201 COMMAND_CONSUMER_UPDATE => ConsumerUpdateCommand::decode(input)
202 .map(|(remaining, kind)| (remaining, ResponseKind::ConsumerUpdate(kind)))?,
203 n => return Err(DecodeError::UnsupportedResponseType(n)),
204 };
205 Ok((input, Response { header, kind }))
206 }
207}
208
209impl Decoder for ResponseCode {
210 fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
211 let (input, code) = read_u16(input)?;
212 Ok((input, code.try_into()?))
213 }
214}
215
216pub trait FromResponse
217where
218 Self: Sized,
219{
220 fn from_response(response: Response) -> Option<Self>;
221}
222
223#[cfg(test)]
224mod tests {
225
226 use std::collections::HashMap;
227
228 use byteorder::{BigEndian, WriteBytesExt};
229
230 use super::{Response, ResponseKind};
231 use crate::{
232 codec::{Decoder, Encoder},
233 commands::{
234 close::CloseResponse, consumer_update::ConsumerUpdateCommand,
235 consumer_update_request::ConsumerUpdateRequestCommand, deliver::DeliverCommand,
236 exchange_command_versions::ExchangeCommandVersionsResponse, generic::GenericResponse,
237 heart_beat::HeartbeatResponse, metadata::MetadataResponse,
238 metadata_update::MetadataUpdateCommand, open::OpenResponse,
239 peer_properties::PeerPropertiesResponse, publish_confirm::PublishConfirm,
240 publish_error::PublishErrorResponse, query_offset::QueryOffsetResponse,
241 query_publisher_sequence::QueryPublisherResponse,
242 sasl_handshake::SaslHandshakeResponse,
243 superstream_partitions::SuperStreamPartitionsResponse,
244 superstream_route::SuperStreamRouteResponse, tune::TunesCommand,
245 },
246 protocol::{
247 commands::{
248 COMMAND_CLOSE, COMMAND_CONSUMER_UPDATE, COMMAND_CONSUMER_UPDATE_REQUEST,
249 COMMAND_DELIVER, COMMAND_HEARTBEAT, COMMAND_METADATA, COMMAND_METADATA_UPDATE,
250 COMMAND_OPEN, COMMAND_PARTITIONS, COMMAND_PEER_PROPERTIES, COMMAND_PUBLISH_CONFIRM,
251 COMMAND_PUBLISH_ERROR, COMMAND_QUERY_OFFSET, COMMAND_QUERY_PUBLISHER_SEQUENCE,
252 COMMAND_ROUTE, COMMAND_SASL_AUTHENTICATE, COMMAND_SASL_HANDSHAKE, COMMAND_TUNE,
253 },
254 version::PROTOCOL_VERSION,
255 },
256 response::COMMAND_EXCHANGE_COMMAND_VERSIONS,
257 types::Header,
258 ResponseCode,
259 };
260
261 impl Encoder for ResponseKind {
262 fn encoded_size(&self) -> u32 {
263 match self {
264 ResponseKind::Open(open) => open.encoded_size(),
265 ResponseKind::Close(close) => close.encoded_size(),
266 ResponseKind::PeerProperties(peer_properties) => peer_properties.encoded_size(),
267 ResponseKind::SaslHandshake(handshake) => handshake.encoded_size(),
268 ResponseKind::Generic(generic) => generic.encoded_size(),
269 ResponseKind::Tunes(tune) => tune.encoded_size(),
270 ResponseKind::Heartbeat(heartbeat) => heartbeat.encoded_size(),
271 ResponseKind::Deliver(deliver) => deliver.encoded_size(),
272 ResponseKind::Metadata(metadata) => metadata.encoded_size(),
273 ResponseKind::MetadataUpdate(metadata) => metadata.encoded_size(),
274 ResponseKind::PublishConfirm(publish_confirm) => publish_confirm.encoded_size(),
275 ResponseKind::PublishError(publish_error) => publish_error.encoded_size(),
276 ResponseKind::QueryOffset(query_offset) => query_offset.encoded_size(),
277 ResponseKind::QueryPublisherSequence(query_publisher) => {
278 query_publisher.encoded_size()
279 }
280 ResponseKind::Credit(credit) => credit.encoded_size(),
281 ResponseKind::ExchangeCommandVersions(exchange_command_versions) => {
282 exchange_command_versions.encoded_size()
283 }
284 ResponseKind::SuperStreamPartitions(super_stream_response) => {
285 super_stream_response.encoded_size()
286 }
287 ResponseKind::SuperStreamRoute(super_stream_response) => {
288 super_stream_response.encoded_size()
289 }
290 ResponseKind::ConsumerUpdate(consumer_update_response) => {
291 consumer_update_response.encoded_size()
292 }
293 ResponseKind::ConsumerUpdateRequest(consumer_update_request_response) => {
294 consumer_update_request_response.encoded_size()
295 }
296 }
297 }
298
299 fn encode(
300 &self,
301 writer: &mut impl std::io::Write,
302 ) -> Result<(), crate::error::EncodeError> {
303 match self {
304 ResponseKind::Open(open) => open.encode(writer),
305 ResponseKind::Close(close) => close.encode(writer),
306 ResponseKind::PeerProperties(peer_properties) => peer_properties.encode(writer),
307 ResponseKind::SaslHandshake(handshake) => handshake.encode(writer),
308 ResponseKind::Generic(generic) => generic.encode(writer),
309 ResponseKind::Tunes(tune) => tune.encode(writer),
310 ResponseKind::Heartbeat(heartbeat) => heartbeat.encode(writer),
311 ResponseKind::Deliver(deliver) => deliver.encode(writer),
312 ResponseKind::Metadata(metadata) => metadata.encode(writer),
313 ResponseKind::MetadataUpdate(metadata) => metadata.encode(writer),
314 ResponseKind::PublishConfirm(publish_confirm) => publish_confirm.encode(writer),
315 ResponseKind::PublishError(publish_error) => publish_error.encode(writer),
316 ResponseKind::QueryOffset(query_offset) => query_offset.encode(writer),
317 ResponseKind::QueryPublisherSequence(query_publisher) => {
318 query_publisher.encode(writer)
319 }
320 ResponseKind::Credit(credit) => credit.encode(writer),
321 ResponseKind::ExchangeCommandVersions(exchange_command_versions) => {
322 exchange_command_versions.encode(writer)
323 }
324 ResponseKind::SuperStreamPartitions(super_stream_command_versions) => {
325 super_stream_command_versions.encode(writer)
326 }
327 ResponseKind::SuperStreamRoute(super_stream_command_versions) => {
328 super_stream_command_versions.encode(writer)
329 }
330 ResponseKind::ConsumerUpdate(consumer_update_command_version) => {
331 consumer_update_command_version.encode(writer)
332 }
333 ResponseKind::ConsumerUpdateRequest(consumer_update_request_command_version) => {
334 consumer_update_request_command_version.encode(writer)
335 }
336 }
337 }
338 }
339
340 impl Encoder for Response {
341 fn encoded_size(&self) -> u32 {
342 self.header.encoded_size() + 2 + self.kind.encoded_size()
343 }
344
345 fn encode(
346 &self,
347 writer: &mut impl std::io::Write,
348 ) -> Result<(), crate::error::EncodeError> {
349 writer.write_u32::<BigEndian>(self.encoded_size())?;
350 self.header.encode(writer)?;
351 self.kind.encode(writer)?;
352 Ok(())
353 }
354 }
355
356 macro_rules! response_test {
357 ($ty:ty, $variant:path, $cmd:expr) => {
358 use fake::{Fake, Faker};
359 let payload: $ty = Faker.fake();
360
361 let response = Response {
362 header: Header::new($cmd, PROTOCOL_VERSION),
363 kind: $variant(payload),
364 };
365
366 let mut buffer = vec![];
367
368 response.encode(&mut buffer).unwrap();
369
370 let (remaining, decoded) = Response::decode(&buffer).unwrap();
371
372 assert_eq!(response, decoded);
373
374 assert!(remaining.is_empty());
375 };
376 }
377
378 macro_rules! response_payload_test {
379 ($payload:expr, $variant:path, $cmd:expr) => {
380 let response = Response {
381 header: Header::new($cmd, PROTOCOL_VERSION),
382 kind: $variant($payload),
383 };
384
385 let mut buffer = vec![];
386
387 response.encode(&mut buffer).unwrap();
388
389 let (remaining, decoded) = Response::decode(&buffer).unwrap();
390
391 assert_eq!(response, decoded);
392
393 assert!(remaining.is_empty());
394 };
395 }
396
397 #[test]
398 fn open_response_ok_test() {
399 use {fake::Fake, fake::Faker};
400 let mut response: OpenResponse = Faker.fake();
401 response.code = ResponseCode::Ok;
402 response_payload_test!(response, ResponseKind::Open, COMMAND_OPEN);
403 }
404
405 #[test]
406 fn peer_properties_response_test() {
407 response_test!(
408 PeerPropertiesResponse,
409 ResponseKind::PeerProperties,
410 COMMAND_PEER_PROPERTIES
411 );
412 }
413
414 #[test]
415 fn sasl_handshake_response_test() {
416 response_test!(
417 SaslHandshakeResponse,
418 ResponseKind::SaslHandshake,
419 COMMAND_SASL_HANDSHAKE
420 );
421 }
422
423 #[test]
424 fn generic_response_test() {
425 response_test!(
426 GenericResponse,
427 ResponseKind::Generic,
428 COMMAND_SASL_AUTHENTICATE
429 );
430 }
431
432 #[test]
433 fn tune_response_test() {
434 response_test!(TunesCommand, ResponseKind::Tunes, COMMAND_TUNE);
435 }
436 #[test]
437 fn metadata_response_test() {
438 response_test!(MetadataResponse, ResponseKind::Metadata, COMMAND_METADATA);
439 }
440 #[test]
441 fn close_response_test() {
442 response_test!(CloseResponse, ResponseKind::Close, COMMAND_CLOSE);
443 }
444 #[test]
445 fn deliver_response_test() {
446 response_test!(DeliverCommand, ResponseKind::Deliver, COMMAND_DELIVER);
447 }
448 #[test]
449 fn metadata_update_response_test() {
450 response_test!(
451 MetadataUpdateCommand,
452 ResponseKind::MetadataUpdate,
453 COMMAND_METADATA_UPDATE
454 );
455 }
456 #[test]
457 fn publish_confirm_response_test() {
458 response_test!(
459 PublishConfirm,
460 ResponseKind::PublishConfirm,
461 COMMAND_PUBLISH_CONFIRM
462 );
463 }
464 #[test]
465 fn publish_error_response_test() {
466 response_test!(
467 PublishErrorResponse,
468 ResponseKind::PublishError,
469 COMMAND_PUBLISH_ERROR
470 );
471 }
472 #[test]
473 fn query_offset_response_test() {
474 response_test!(
475 QueryOffsetResponse,
476 ResponseKind::QueryOffset,
477 COMMAND_QUERY_OFFSET
478 );
479 }
480 #[test]
481 fn query_publisher_response_test() {
482 response_test!(
483 QueryPublisherResponse,
484 ResponseKind::QueryPublisherSequence,
485 COMMAND_QUERY_PUBLISHER_SEQUENCE
486 );
487 }
488 #[test]
489 fn heartbeat_response_test() {
490 response_test!(
491 HeartbeatResponse,
492 ResponseKind::Heartbeat,
493 COMMAND_HEARTBEAT
494 );
495 }
496
497 #[test]
498 fn exchange_command_versions_response_test() {
499 response_test!(
500 ExchangeCommandVersionsResponse,
501 ResponseKind::ExchangeCommandVersions,
502 COMMAND_EXCHANGE_COMMAND_VERSIONS
503 );
504 }
505
506 #[test]
507 fn super_stream_partitions_response_test() {
508 response_test!(
509 SuperStreamPartitionsResponse,
510 ResponseKind::SuperStreamPartitions,
511 COMMAND_PARTITIONS
512 );
513 }
514
515 #[test]
516 fn super_stream_route_response_test() {
517 response_test!(
518 SuperStreamRouteResponse,
519 ResponseKind::SuperStreamRoute,
520 COMMAND_ROUTE
521 );
522 }
523
524 #[test]
525 fn consumer_update_response_test() {
526 response_test!(
527 ConsumerUpdateCommand,
528 ResponseKind::ConsumerUpdate,
529 COMMAND_CONSUMER_UPDATE
530 );
531 }
532}