1use std::convert::TryInto;
2
3use crate::{
4 codec::{
5 decoder::{read_u16, read_u32},
6 Decoder,
7 },
8 commands::{
9 close::CloseResponse, consumer_update::ConsumerUpdateCommand,
10 consumer_update_request::ConsumerUpdateRequestCommand, credit::CreditResponse,
11 deliver::DeliverCommand, exchange_command_versions::ExchangeCommandVersionsResponse,
12 generic::GenericResponse, heart_beat::HeartbeatResponse, metadata::MetadataResponse,
13 metadata_update::MetadataUpdateCommand, open::OpenResponse,
14 peer_properties::PeerPropertiesResponse, publish_confirm::PublishConfirm,
15 publish_error::PublishErrorResponse, query_offset::QueryOffsetResponse,
16 query_publisher_sequence::QueryPublisherResponse, sasl_handshake::SaslHandshakeResponse,
17 superstream_partitions::SuperStreamPartitionsResponse,
18 superstream_route::SuperStreamRouteResponse, tune::TunesCommand,
19 },
20 error::DecodeError,
21 protocol::commands::*,
22 types::Header,
23};
24
25mod shims;
26
27#[cfg_attr(test, derive(fake::Dummy))]
28#[derive(Debug, PartialEq, Eq, Clone)]
29pub enum ResponseCode {
30 Ok,
31 StreamDoesNotExist,
32 SubscriptionIdAlreadyExists,
33 SubscriptionIdDoesNotExist,
34 StreamAlreadyExists,
35 StreamNotAvailable,
36 SaslMechanismNotSupported,
37 AuthenticationFailure,
38 SaslError,
39 SaslChallange,
40 AuthenticationFailureLoopback,
41 VirtualHostAccessFailure,
42 UnknownFrame,
43 FrameTooLarge,
44 InternalError,
45 AccessRefused,
46 PrecoditionFailed,
47 PublisherDoesNotExist,
48 OffsetNotFound,
49}
50#[derive(Debug, PartialEq, Eq)]
51pub struct Response {
52 header: Header,
53 pub(crate) kind: ResponseKind,
54}
55
56#[derive(Debug, PartialEq, Eq)]
57pub enum ResponseKind {
58 Open(OpenResponse),
59 Close(CloseResponse),
60 PeerProperties(PeerPropertiesResponse),
61 SaslHandshake(SaslHandshakeResponse),
62 Generic(GenericResponse),
63 Tunes(TunesCommand),
64 Deliver(DeliverCommand),
65 Heartbeat(HeartbeatResponse),
66 Metadata(MetadataResponse),
67 MetadataUpdate(MetadataUpdateCommand),
68 PublishConfirm(PublishConfirm),
69 PublishError(PublishErrorResponse),
70 QueryOffset(QueryOffsetResponse),
71 QueryPublisherSequence(QueryPublisherResponse),
72 Credit(CreditResponse),
73 ExchangeCommandVersions(ExchangeCommandVersionsResponse),
74 SuperStreamPartitions(SuperStreamPartitionsResponse),
75 SuperStreamRoute(SuperStreamRouteResponse),
76 ConsumerUpdate(ConsumerUpdateCommand),
77 ConsumerUpdateRequest(ConsumerUpdateRequestCommand),
78}
79
80impl Response {
81 pub fn new(header: Header, kind: ResponseKind) -> Self {
82 Self { header, kind }
83 }
84
85 pub fn correlation_id(&self) -> Option<u32> {
86 match &self.kind {
87 ResponseKind::Open(open) => Some(open.correlation_id),
88 ResponseKind::Close(close) => Some(close.correlation_id),
89 ResponseKind::PeerProperties(peer_properties) => Some(peer_properties.correlation_id),
90 ResponseKind::SaslHandshake(handshake) => Some(handshake.correlation_id),
91 ResponseKind::Generic(generic) => Some(generic.correlation_id),
92 ResponseKind::Metadata(metadata) => Some(metadata.correlation_id),
93 ResponseKind::QueryOffset(query_offset) => Some(query_offset.correlation_id),
94 ResponseKind::QueryPublisherSequence(query_publisher) => {
95 Some(query_publisher.correlation_id)
96 }
97 ResponseKind::MetadataUpdate(_) => None,
98 ResponseKind::PublishConfirm(_) => None,
99 ResponseKind::PublishError(_) => None,
100 ResponseKind::Tunes(_) => None,
101 ResponseKind::Heartbeat(_) => None,
102 ResponseKind::Deliver(_) => None,
103 ResponseKind::Credit(_) => None,
104 ResponseKind::ExchangeCommandVersions(exchange_command_versions) => {
105 Some(exchange_command_versions.correlation_id)
106 }
107 ResponseKind::SuperStreamPartitions(super_stream_partitions_command) => {
108 Some(super_stream_partitions_command.correlation_id)
109 }
110 ResponseKind::SuperStreamRoute(super_stream_route_command) => {
111 Some(super_stream_route_command.correlation_id)
112 }
113 ResponseKind::ConsumerUpdate(consumer_update_command) => {
114 Some(consumer_update_command.correlation_id)
115 }
116 ResponseKind::ConsumerUpdateRequest(consumer_update_request_command) => {
117 Some(consumer_update_request_command.correlation_id)
118 }
119 }
120 }
121
122 pub fn get<T>(self) -> Option<T>
123 where
124 T: FromResponse,
125 {
126 T::from_response(self)
127 }
128
129 pub fn kind_ref(&self) -> &ResponseKind {
130 &self.kind
131 }
132 pub fn kind(self) -> ResponseKind {
133 self.kind
134 }
135}
136
137impl Decoder for Response {
138 fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
139 let (input, _) = read_u32(input)?;
140
141 let (input, header) = Header::decode(input)?;
142 let (input, kind) = match header.key() {
143 COMMAND_OPEN => {
144 OpenResponse::decode(input).map(|(i, kind)| (i, ResponseKind::Open(kind)))?
145 }
146
147 COMMAND_CLOSE => {
148 CloseResponse::decode(input).map(|(i, kind)| (i, ResponseKind::Close(kind)))?
149 }
150 COMMAND_PEER_PROPERTIES => PeerPropertiesResponse::decode(input)
151 .map(|(i, kind)| (i, ResponseKind::PeerProperties(kind)))?,
152 COMMAND_SASL_HANDSHAKE => SaslHandshakeResponse::decode(input)
153 .map(|(i, kind)| (i, ResponseKind::SaslHandshake(kind)))?,
154
155 COMMAND_DECLARE_PUBLISHER
156 | COMMAND_DELETE_PUBLISHER
157 | COMMAND_SASL_AUTHENTICATE
158 | COMMAND_SUBSCRIBE
159 | COMMAND_UNSUBSCRIBE
160 | COMMAND_CREATE_STREAM
161 | COMMAND_CREATE_SUPER_STREAM
162 | COMMAND_DELETE_SUPER_STREAM
163 | COMMAND_CONSUMER_UPDATE_REQUEST
164 | COMMAND_DELETE_STREAM => {
165 GenericResponse::decode(input).map(|(i, kind)| (i, ResponseKind::Generic(kind)))?
166 }
167 COMMAND_TUNE => {
168 TunesCommand::decode(input).map(|(i, kind)| (i, ResponseKind::Tunes(kind)))?
169 }
170 COMMAND_DELIVER => DeliverCommand::decode(input)
171 .map(|(remaining, kind)| (remaining, ResponseKind::Deliver(kind)))?,
172
173 COMMAND_HEARTBEAT => HeartbeatResponse::decode(input)
174 .map(|(remaining, kind)| (remaining, ResponseKind::Heartbeat(kind)))?,
175 COMMAND_METADATA => MetadataResponse::decode(input)
176 .map(|(remaining, kind)| (remaining, ResponseKind::Metadata(kind)))?,
177 COMMAND_METADATA_UPDATE => MetadataUpdateCommand::decode(input)
178 .map(|(remaining, kind)| (remaining, ResponseKind::MetadataUpdate(kind)))?,
179 COMMAND_PUBLISH_CONFIRM => PublishConfirm::decode(input)
180 .map(|(remaining, kind)| (remaining, ResponseKind::PublishConfirm(kind)))?,
181
182 COMMAND_PUBLISH_ERROR => PublishErrorResponse::decode(input)
183 .map(|(remaining, kind)| (remaining, ResponseKind::PublishError(kind)))?,
184
185 COMMAND_QUERY_OFFSET => QueryOffsetResponse::decode(input)
186 .map(|(remaining, kind)| (remaining, ResponseKind::QueryOffset(kind)))?,
187
188 COMMAND_CREDIT => CreditResponse::decode(input)
189 .map(|(remaining, kind)| (remaining, ResponseKind::Credit(kind)))?,
190
191 COMMAND_QUERY_PUBLISHER_SEQUENCE => QueryPublisherResponse::decode(input)
192 .map(|(remaining, kind)| (remaining, ResponseKind::QueryPublisherSequence(kind)))?,
193 COMMAND_EXCHANGE_COMMAND_VERSIONS => ExchangeCommandVersionsResponse::decode(input)
194 .map(|(remaining, kind)| {
195 (remaining, ResponseKind::ExchangeCommandVersions(kind))
196 })?,
197 COMMAND_PARTITIONS => SuperStreamPartitionsResponse::decode(input)
198 .map(|(remaining, kind)| (remaining, ResponseKind::SuperStreamPartitions(kind)))?,
199 COMMAND_ROUTE => SuperStreamRouteResponse::decode(input)
200 .map(|(remaining, kind)| (remaining, ResponseKind::SuperStreamRoute(kind)))?,
201 COMMAND_CONSUMER_UPDATE => ConsumerUpdateCommand::decode(input)
202 .map(|(remaining, kind)| (remaining, ResponseKind::ConsumerUpdate(kind)))?,
203 n => return Err(DecodeError::UnsupportedResponseType(n)),
204 };
205 Ok((input, Response { header, kind }))
206 }
207}
208
209impl Decoder for ResponseCode {
210 fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
211 let (input, code) = read_u16(input)?;
212 Ok((input, code.try_into()?))
213 }
214}
215
216pub trait FromResponse
217where
218 Self: Sized,
219{
220 fn from_response(response: Response) -> Option<Self>;
221}
222
223#[cfg(test)]
224mod tests {
225
226 use byteorder::{BigEndian, WriteBytesExt};
227
228 use super::{Response, ResponseKind};
229 use crate::{
230 codec::{Decoder, Encoder},
231 commands::{
232 close::CloseResponse, consumer_update::ConsumerUpdateCommand,
233 consumer_update_request::ConsumerUpdateRequestCommand, deliver::DeliverCommand,
234 exchange_command_versions::ExchangeCommandVersionsResponse, generic::GenericResponse,
235 heart_beat::HeartbeatResponse, metadata::MetadataResponse,
236 metadata_update::MetadataUpdateCommand, open::OpenResponse,
237 peer_properties::PeerPropertiesResponse, publish_confirm::PublishConfirm,
238 publish_error::PublishErrorResponse, query_offset::QueryOffsetResponse,
239 query_publisher_sequence::QueryPublisherResponse,
240 sasl_handshake::SaslHandshakeResponse,
241 superstream_partitions::SuperStreamPartitionsResponse,
242 superstream_route::SuperStreamRouteResponse, tune::TunesCommand,
243 },
244 protocol::{
245 commands::{
246 COMMAND_CLOSE, COMMAND_CONSUMER_UPDATE, COMMAND_CONSUMER_UPDATE_REQUEST,
247 COMMAND_DELIVER, COMMAND_HEARTBEAT, COMMAND_METADATA, COMMAND_METADATA_UPDATE,
248 COMMAND_OPEN, COMMAND_PARTITIONS, COMMAND_PEER_PROPERTIES, COMMAND_PUBLISH_CONFIRM,
249 COMMAND_PUBLISH_ERROR, COMMAND_QUERY_OFFSET, COMMAND_QUERY_PUBLISHER_SEQUENCE,
250 COMMAND_ROUTE, COMMAND_SASL_AUTHENTICATE, COMMAND_SASL_HANDSHAKE, COMMAND_TUNE,
251 },
252 version::PROTOCOL_VERSION,
253 },
254 response::COMMAND_EXCHANGE_COMMAND_VERSIONS,
255 types::Header,
256 ResponseCode,
257 };
258
259 impl Encoder for ResponseKind {
260 fn encoded_size(&self) -> u32 {
261 match self {
262 ResponseKind::Open(open) => open.encoded_size(),
263 ResponseKind::Close(close) => close.encoded_size(),
264 ResponseKind::PeerProperties(peer_properties) => peer_properties.encoded_size(),
265 ResponseKind::SaslHandshake(handshake) => handshake.encoded_size(),
266 ResponseKind::Generic(generic) => generic.encoded_size(),
267 ResponseKind::Tunes(tune) => tune.encoded_size(),
268 ResponseKind::Heartbeat(heartbeat) => heartbeat.encoded_size(),
269 ResponseKind::Deliver(deliver) => deliver.encoded_size(),
270 ResponseKind::Metadata(metadata) => metadata.encoded_size(),
271 ResponseKind::MetadataUpdate(metadata) => metadata.encoded_size(),
272 ResponseKind::PublishConfirm(publish_confirm) => publish_confirm.encoded_size(),
273 ResponseKind::PublishError(publish_error) => publish_error.encoded_size(),
274 ResponseKind::QueryOffset(query_offset) => query_offset.encoded_size(),
275 ResponseKind::QueryPublisherSequence(query_publisher) => {
276 query_publisher.encoded_size()
277 }
278 ResponseKind::Credit(credit) => credit.encoded_size(),
279 ResponseKind::ExchangeCommandVersions(exchange_command_versions) => {
280 exchange_command_versions.encoded_size()
281 }
282 ResponseKind::SuperStreamPartitions(super_stream_response) => {
283 super_stream_response.encoded_size()
284 }
285 ResponseKind::SuperStreamRoute(super_stream_response) => {
286 super_stream_response.encoded_size()
287 }
288 ResponseKind::ConsumerUpdate(consumer_update_response) => {
289 consumer_update_response.encoded_size()
290 }
291 ResponseKind::ConsumerUpdateRequest(consumer_update_request_response) => {
292 consumer_update_request_response.encoded_size()
293 }
294 }
295 }
296
297 fn encode(
298 &self,
299 writer: &mut impl std::io::Write,
300 ) -> Result<(), crate::error::EncodeError> {
301 match self {
302 ResponseKind::Open(open) => open.encode(writer),
303 ResponseKind::Close(close) => close.encode(writer),
304 ResponseKind::PeerProperties(peer_properties) => peer_properties.encode(writer),
305 ResponseKind::SaslHandshake(handshake) => handshake.encode(writer),
306 ResponseKind::Generic(generic) => generic.encode(writer),
307 ResponseKind::Tunes(tune) => tune.encode(writer),
308 ResponseKind::Heartbeat(heartbeat) => heartbeat.encode(writer),
309 ResponseKind::Deliver(deliver) => deliver.encode(writer),
310 ResponseKind::Metadata(metadata) => metadata.encode(writer),
311 ResponseKind::MetadataUpdate(metadata) => metadata.encode(writer),
312 ResponseKind::PublishConfirm(publish_confirm) => publish_confirm.encode(writer),
313 ResponseKind::PublishError(publish_error) => publish_error.encode(writer),
314 ResponseKind::QueryOffset(query_offset) => query_offset.encode(writer),
315 ResponseKind::QueryPublisherSequence(query_publisher) => {
316 query_publisher.encode(writer)
317 }
318 ResponseKind::Credit(credit) => credit.encode(writer),
319 ResponseKind::ExchangeCommandVersions(exchange_command_versions) => {
320 exchange_command_versions.encode(writer)
321 }
322 ResponseKind::SuperStreamPartitions(super_stream_command_versions) => {
323 super_stream_command_versions.encode(writer)
324 }
325 ResponseKind::SuperStreamRoute(super_stream_command_versions) => {
326 super_stream_command_versions.encode(writer)
327 }
328 ResponseKind::ConsumerUpdate(consumer_update_command_version) => {
329 consumer_update_command_version.encode(writer)
330 }
331 ResponseKind::ConsumerUpdateRequest(consumer_update_request_command_version) => {
332 consumer_update_request_command_version.encode(writer)
333 }
334 }
335 }
336 }
337
338 impl Encoder for Response {
339 fn encoded_size(&self) -> u32 {
340 self.header.encoded_size() + 2 + self.kind.encoded_size()
341 }
342
343 fn encode(
344 &self,
345 writer: &mut impl std::io::Write,
346 ) -> Result<(), crate::error::EncodeError> {
347 writer.write_u32::<BigEndian>(self.encoded_size())?;
348 self.header.encode(writer)?;
349 self.kind.encode(writer)?;
350 Ok(())
351 }
352 }
353
354 macro_rules! response_test {
355 ($ty:ty, $variant:path, $cmd:expr) => {
356 use fake::{Fake, Faker};
357 let payload: $ty = Faker.fake();
358
359 let response = Response {
360 header: Header::new($cmd, PROTOCOL_VERSION),
361 kind: $variant(payload),
362 };
363
364 let mut buffer = vec![];
365
366 response.encode(&mut buffer).unwrap();
367
368 let (remaining, decoded) = Response::decode(&buffer).unwrap();
369
370 assert_eq!(response, decoded);
371
372 assert!(remaining.is_empty());
373 };
374 }
375
376 macro_rules! response_payload_test {
377 ($payload:expr, $variant:path, $cmd:expr) => {
378 let response = Response {
379 header: Header::new($cmd, PROTOCOL_VERSION),
380 kind: $variant($payload),
381 };
382
383 let mut buffer = vec![];
384
385 response.encode(&mut buffer).unwrap();
386
387 let (remaining, decoded) = Response::decode(&buffer).unwrap();
388
389 assert_eq!(response, decoded);
390
391 assert!(remaining.is_empty());
392 };
393 }
394
395 #[test]
396 fn open_response_ok_test() {
397 use {fake::Fake, fake::Faker};
398 let mut response: OpenResponse = Faker.fake();
399 response.code = ResponseCode::Ok;
400 response_payload_test!(response, ResponseKind::Open, COMMAND_OPEN);
401 }
402
403 #[test]
404 fn peer_properties_response_test() {
405 response_test!(
406 PeerPropertiesResponse,
407 ResponseKind::PeerProperties,
408 COMMAND_PEER_PROPERTIES
409 );
410 }
411
412 #[test]
413 fn sasl_handshake_response_test() {
414 response_test!(
415 SaslHandshakeResponse,
416 ResponseKind::SaslHandshake,
417 COMMAND_SASL_HANDSHAKE
418 );
419 }
420
421 #[test]
422 fn generic_response_test() {
423 response_test!(
424 GenericResponse,
425 ResponseKind::Generic,
426 COMMAND_SASL_AUTHENTICATE
427 );
428 }
429
430 #[test]
431 fn tune_response_test() {
432 response_test!(TunesCommand, ResponseKind::Tunes, COMMAND_TUNE);
433 }
434 #[test]
435 fn metadata_response_test() {
436 response_test!(MetadataResponse, ResponseKind::Metadata, COMMAND_METADATA);
437 }
438 #[test]
439 fn close_response_test() {
440 response_test!(CloseResponse, ResponseKind::Close, COMMAND_CLOSE);
441 }
442 #[test]
443 fn deliver_response_test() {
444 response_test!(DeliverCommand, ResponseKind::Deliver, COMMAND_DELIVER);
445 }
446 #[test]
447 fn metadata_update_response_test() {
448 response_test!(
449 MetadataUpdateCommand,
450 ResponseKind::MetadataUpdate,
451 COMMAND_METADATA_UPDATE
452 );
453 }
454 #[test]
455 fn publish_confirm_response_test() {
456 response_test!(
457 PublishConfirm,
458 ResponseKind::PublishConfirm,
459 COMMAND_PUBLISH_CONFIRM
460 );
461 }
462 #[test]
463 fn publish_error_response_test() {
464 response_test!(
465 PublishErrorResponse,
466 ResponseKind::PublishError,
467 COMMAND_PUBLISH_ERROR
468 );
469 }
470 #[test]
471 fn query_offset_response_test() {
472 response_test!(
473 QueryOffsetResponse,
474 ResponseKind::QueryOffset,
475 COMMAND_QUERY_OFFSET
476 );
477 }
478 #[test]
479 fn query_publisher_response_test() {
480 response_test!(
481 QueryPublisherResponse,
482 ResponseKind::QueryPublisherSequence,
483 COMMAND_QUERY_PUBLISHER_SEQUENCE
484 );
485 }
486 #[test]
487 fn heartbeat_response_test() {
488 response_test!(
489 HeartbeatResponse,
490 ResponseKind::Heartbeat,
491 COMMAND_HEARTBEAT
492 );
493 }
494
495 #[test]
496 fn exchange_command_versions_response_test() {
497 response_test!(
498 ExchangeCommandVersionsResponse,
499 ResponseKind::ExchangeCommandVersions,
500 COMMAND_EXCHANGE_COMMAND_VERSIONS
501 );
502 }
503
504 #[test]
505 fn super_stream_partitions_response_test() {
506 response_test!(
507 SuperStreamPartitionsResponse,
508 ResponseKind::SuperStreamPartitions,
509 COMMAND_PARTITIONS
510 );
511 }
512
513 #[test]
514 fn super_stream_route_response_test() {
515 response_test!(
516 SuperStreamRouteResponse,
517 ResponseKind::SuperStreamRoute,
518 COMMAND_ROUTE
519 );
520 }
521
522 #[test]
523 fn consumer_update_response_test() {
524 response_test!(
525 ConsumerUpdateCommand,
526 ResponseKind::ConsumerUpdate,
527 COMMAND_CONSUMER_UPDATE
528 );
529 }
530}