1use std::convert::TryInto;
2
3use crate::{
4 codec::{
5 decoder::{read_u16, read_u32},
6 Decoder,
7 },
8 commands::{
9 close::CloseResponse, consumer_update::ConsumerUpdateCommand,
10 consumer_update_request::ConsumerUpdateRequestCommand, credit::CreditResponse,
11 deliver::DeliverCommand, exchange_command_versions::ExchangeCommandVersionsResponse,
12 generic::GenericResponse, heart_beat::HeartbeatResponse, metadata::MetadataResponse,
13 metadata_update::MetadataUpdateCommand, open::OpenResponse,
14 peer_properties::PeerPropertiesResponse, publish_confirm::PublishConfirm,
15 publish_error::PublishErrorResponse, query_offset::QueryOffsetResponse,
16 query_publisher_sequence::QueryPublisherResponse, sasl_handshake::SaslHandshakeResponse,
17 superstream_partitions::SuperStreamPartitionsResponse,
18 superstream_route::SuperStreamRouteResponse, tune::TunesCommand,
19 },
20 error::DecodeError,
21 protocol::commands::*,
22 types::Header,
23};
24
25mod shims;
26
27#[cfg_attr(test, derive(fake::Dummy))]
28#[derive(Debug, PartialEq, Eq, Clone)]
29pub enum ResponseCode {
30 Ok,
31 StreamDoesNotExist,
32 SubscriptionIdAlreadyExists,
33 SubscriptionIdDoesNotExist,
34 StreamAlreadyExists,
35 StreamNotAvailable,
36 SaslMechanismNotSupported,
37 AuthenticationFailure,
38 SaslError,
39 SaslChallange,
40 AuthenticationFailureLoopback,
41 VirtualHostAccessFailure,
42 UnknownFrame,
43 FrameTooLarge,
44 InternalError,
45 AccessRefused,
46 PreconditionFailed,
47 PublisherDoesNotExist,
48 OffsetNotFound,
49}
50#[derive(Debug, PartialEq, Eq)]
51pub struct Response {
52 header: Header,
53 pub(crate) kind: ResponseKind,
54}
55
56#[derive(Debug, PartialEq, Eq)]
57pub enum ResponseKind {
58 Open(OpenResponse),
59 Close(CloseResponse),
60 PeerProperties(PeerPropertiesResponse),
61 SaslHandshake(SaslHandshakeResponse),
62 Generic(GenericResponse),
63 Tunes(TunesCommand),
64 Deliver(DeliverCommand),
65 Heartbeat(HeartbeatResponse),
66 Metadata(MetadataResponse),
67 MetadataUpdate(MetadataUpdateCommand),
68 PublishConfirm(PublishConfirm),
69 PublishError(PublishErrorResponse),
70 QueryOffset(QueryOffsetResponse),
71 QueryPublisherSequence(QueryPublisherResponse),
72 Credit(CreditResponse),
73 ExchangeCommandVersions(ExchangeCommandVersionsResponse),
74 SuperStreamPartitions(SuperStreamPartitionsResponse),
75 SuperStreamRoute(SuperStreamRouteResponse),
76 ConsumerUpdate(ConsumerUpdateCommand),
77 ConsumerUpdateRequest(ConsumerUpdateRequestCommand),
78}
79
80impl Response {
81 pub fn new(header: Header, kind: ResponseKind) -> Self {
82 Self { header, kind }
83 }
84
85 pub fn correlation_id(&self) -> Option<u32> {
86 match &self.kind {
87 ResponseKind::Open(open) => Some(open.correlation_id),
88 ResponseKind::Close(close) => Some(close.correlation_id),
89 ResponseKind::PeerProperties(peer_properties) => Some(peer_properties.correlation_id),
90 ResponseKind::SaslHandshake(handshake) => Some(handshake.correlation_id),
91 ResponseKind::Generic(generic) => Some(generic.correlation_id),
92 ResponseKind::Metadata(metadata) => Some(metadata.correlation_id),
93 ResponseKind::QueryOffset(query_offset) => Some(query_offset.correlation_id),
94 ResponseKind::QueryPublisherSequence(query_publisher) => {
95 Some(query_publisher.correlation_id)
96 }
97 ResponseKind::MetadataUpdate(_) => None,
98 ResponseKind::PublishConfirm(_) => None,
99 ResponseKind::PublishError(_) => None,
100 ResponseKind::Tunes(_) => None,
101 ResponseKind::Heartbeat(_) => None,
102 ResponseKind::Deliver(_) => None,
103 ResponseKind::Credit(_) => None,
104 ResponseKind::ExchangeCommandVersions(exchange_command_versions) => {
105 Some(exchange_command_versions.correlation_id)
106 }
107 ResponseKind::SuperStreamPartitions(super_stream_partitions_command) => {
108 Some(super_stream_partitions_command.correlation_id)
109 }
110 ResponseKind::SuperStreamRoute(super_stream_route_command) => {
111 Some(super_stream_route_command.correlation_id)
112 }
113 ResponseKind::ConsumerUpdate(consumer_update_command) => {
114 Some(consumer_update_command.correlation_id)
115 }
116 ResponseKind::ConsumerUpdateRequest(consumer_update_request_command) => {
117 Some(consumer_update_request_command.correlation_id)
118 }
119 }
120 }
121
122 pub fn get<T>(self) -> Option<T>
123 where
124 T: FromResponse,
125 {
126 T::from_response(self)
127 }
128
129 pub fn kind_ref(&self) -> &ResponseKind {
130 &self.kind
131 }
132 pub fn kind(self) -> ResponseKind {
133 self.kind
134 }
135}
136
137impl Decoder for Response {
138 fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
139 let (input, _) = read_u32(input)?;
140
141 let (input, header) = Header::decode(input)?;
142 let (input, kind) = match header.key() {
143 COMMAND_OPEN => {
144 OpenResponse::decode(input).map(|(i, kind)| (i, ResponseKind::Open(kind)))?
145 }
146
147 COMMAND_CLOSE => {
148 CloseResponse::decode(input).map(|(i, kind)| (i, ResponseKind::Close(kind)))?
149 }
150 COMMAND_PEER_PROPERTIES => PeerPropertiesResponse::decode(input)
151 .map(|(i, kind)| (i, ResponseKind::PeerProperties(kind)))?,
152 COMMAND_SASL_HANDSHAKE => SaslHandshakeResponse::decode(input)
153 .map(|(i, kind)| (i, ResponseKind::SaslHandshake(kind)))?,
154
155 COMMAND_DECLARE_PUBLISHER
156 | COMMAND_DELETE_PUBLISHER
157 | COMMAND_SASL_AUTHENTICATE
158 | COMMAND_SUBSCRIBE
159 | COMMAND_UNSUBSCRIBE
160 | COMMAND_CREATE_STREAM
161 | COMMAND_CREATE_SUPER_STREAM
162 | COMMAND_DELETE_SUPER_STREAM
163 | COMMAND_CONSUMER_UPDATE_REQUEST
164 | COMMAND_DELETE_STREAM => {
165 GenericResponse::decode(input).map(|(i, kind)| (i, ResponseKind::Generic(kind)))?
166 }
167 COMMAND_TUNE => {
168 TunesCommand::decode(input).map(|(i, kind)| (i, ResponseKind::Tunes(kind)))?
169 }
170 COMMAND_DELIVER => DeliverCommand::decode(input)
171 .map(|(remaining, kind)| (remaining, ResponseKind::Deliver(kind)))?,
172
173 COMMAND_HEARTBEAT => HeartbeatResponse::decode(input)
174 .map(|(remaining, kind)| (remaining, ResponseKind::Heartbeat(kind)))?,
175 COMMAND_METADATA => MetadataResponse::decode(input)
176 .map(|(remaining, kind)| (remaining, ResponseKind::Metadata(kind)))?,
177 COMMAND_METADATA_UPDATE => MetadataUpdateCommand::decode(input)
178 .map(|(remaining, kind)| (remaining, ResponseKind::MetadataUpdate(kind)))?,
179 COMMAND_PUBLISH_CONFIRM => PublishConfirm::decode(input)
180 .map(|(remaining, kind)| (remaining, ResponseKind::PublishConfirm(kind)))?,
181
182 COMMAND_PUBLISH_ERROR => PublishErrorResponse::decode(input)
183 .map(|(remaining, kind)| (remaining, ResponseKind::PublishError(kind)))?,
184
185 COMMAND_QUERY_OFFSET => QueryOffsetResponse::decode(input)
186 .map(|(remaining, kind)| (remaining, ResponseKind::QueryOffset(kind)))?,
187
188 COMMAND_CREDIT => CreditResponse::decode(input)
189 .map(|(remaining, kind)| (remaining, ResponseKind::Credit(kind)))?,
190
191 COMMAND_QUERY_PUBLISHER_SEQUENCE => QueryPublisherResponse::decode(input)
192 .map(|(remaining, kind)| (remaining, ResponseKind::QueryPublisherSequence(kind)))?,
193 COMMAND_EXCHANGE_COMMAND_VERSIONS => ExchangeCommandVersionsResponse::decode(input)
194 .map(|(remaining, kind)| {
195 (remaining, ResponseKind::ExchangeCommandVersions(kind))
196 })?,
197 COMMAND_PARTITIONS => SuperStreamPartitionsResponse::decode(input)
198 .map(|(remaining, kind)| (remaining, ResponseKind::SuperStreamPartitions(kind)))?,
199 COMMAND_ROUTE => SuperStreamRouteResponse::decode(input)
200 .map(|(remaining, kind)| (remaining, ResponseKind::SuperStreamRoute(kind)))?,
201 COMMAND_CONSUMER_UPDATE => ConsumerUpdateCommand::decode(input)
202 .map(|(remaining, kind)| (remaining, ResponseKind::ConsumerUpdate(kind)))?,
203 n => return Err(DecodeError::UnsupportedResponseType(n)),
204 };
205 Ok((input, Response { header, kind }))
206 }
207}
208
209impl Decoder for ResponseCode {
210 fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
211 let (input, code) = read_u16(input)?;
212 Ok((input, code.try_into()?))
213 }
214}
215
216pub trait FromResponse
217where
218 Self: Sized,
219{
220 fn from_response(response: Response) -> Option<Self>;
221}
222
223#[cfg(test)]
224mod tests {
225
226 use byteorder::{BigEndian, WriteBytesExt};
227
228 use super::{Response, ResponseKind};
229 use crate::{
230 codec::{Decoder, Encoder},
231 commands::{
232 close::CloseResponse, consumer_update::ConsumerUpdateCommand, deliver::DeliverCommand,
233 exchange_command_versions::ExchangeCommandVersionsResponse, generic::GenericResponse,
234 heart_beat::HeartbeatResponse, metadata::MetadataResponse,
235 metadata_update::MetadataUpdateCommand, open::OpenResponse,
236 peer_properties::PeerPropertiesResponse, publish_confirm::PublishConfirm,
237 publish_error::PublishErrorResponse, query_offset::QueryOffsetResponse,
238 query_publisher_sequence::QueryPublisherResponse,
239 sasl_handshake::SaslHandshakeResponse,
240 superstream_partitions::SuperStreamPartitionsResponse,
241 superstream_route::SuperStreamRouteResponse, tune::TunesCommand,
242 },
243 protocol::{
244 commands::{
245 COMMAND_CLOSE, COMMAND_CONSUMER_UPDATE, COMMAND_CONSUMER_UPDATE_REQUEST,
246 COMMAND_DELIVER, COMMAND_HEARTBEAT, COMMAND_METADATA, COMMAND_METADATA_UPDATE,
247 COMMAND_OPEN, COMMAND_PARTITIONS, COMMAND_PEER_PROPERTIES, COMMAND_PUBLISH_CONFIRM,
248 COMMAND_PUBLISH_ERROR, COMMAND_QUERY_OFFSET, COMMAND_QUERY_PUBLISHER_SEQUENCE,
249 COMMAND_ROUTE, COMMAND_SASL_AUTHENTICATE, COMMAND_SASL_HANDSHAKE, COMMAND_TUNE,
250 },
251 version::PROTOCOL_VERSION,
252 },
253 response::COMMAND_EXCHANGE_COMMAND_VERSIONS,
254 types::Header,
255 ResponseCode,
256 };
257
258 impl Encoder for ResponseKind {
259 fn encoded_size(&self) -> u32 {
260 match self {
261 ResponseKind::Open(open) => open.encoded_size(),
262 ResponseKind::Close(close) => close.encoded_size(),
263 ResponseKind::PeerProperties(peer_properties) => peer_properties.encoded_size(),
264 ResponseKind::SaslHandshake(handshake) => handshake.encoded_size(),
265 ResponseKind::Generic(generic) => generic.encoded_size(),
266 ResponseKind::Tunes(tune) => tune.encoded_size(),
267 ResponseKind::Heartbeat(heartbeat) => heartbeat.encoded_size(),
268 ResponseKind::Deliver(deliver) => deliver.encoded_size(),
269 ResponseKind::Metadata(metadata) => metadata.encoded_size(),
270 ResponseKind::MetadataUpdate(metadata) => metadata.encoded_size(),
271 ResponseKind::PublishConfirm(publish_confirm) => publish_confirm.encoded_size(),
272 ResponseKind::PublishError(publish_error) => publish_error.encoded_size(),
273 ResponseKind::QueryOffset(query_offset) => query_offset.encoded_size(),
274 ResponseKind::QueryPublisherSequence(query_publisher) => {
275 query_publisher.encoded_size()
276 }
277 ResponseKind::Credit(credit) => credit.encoded_size(),
278 ResponseKind::ExchangeCommandVersions(exchange_command_versions) => {
279 exchange_command_versions.encoded_size()
280 }
281 ResponseKind::SuperStreamPartitions(super_stream_response) => {
282 super_stream_response.encoded_size()
283 }
284 ResponseKind::SuperStreamRoute(super_stream_response) => {
285 super_stream_response.encoded_size()
286 }
287 ResponseKind::ConsumerUpdate(consumer_update_response) => {
288 consumer_update_response.encoded_size()
289 }
290 ResponseKind::ConsumerUpdateRequest(consumer_update_request_response) => {
291 consumer_update_request_response.encoded_size()
292 }
293 }
294 }
295
296 fn encode(
297 &self,
298 writer: &mut impl std::io::Write,
299 ) -> Result<(), crate::error::EncodeError> {
300 match self {
301 ResponseKind::Open(open) => open.encode(writer),
302 ResponseKind::Close(close) => close.encode(writer),
303 ResponseKind::PeerProperties(peer_properties) => peer_properties.encode(writer),
304 ResponseKind::SaslHandshake(handshake) => handshake.encode(writer),
305 ResponseKind::Generic(generic) => generic.encode(writer),
306 ResponseKind::Tunes(tune) => tune.encode(writer),
307 ResponseKind::Heartbeat(heartbeat) => heartbeat.encode(writer),
308 ResponseKind::Deliver(deliver) => deliver.encode(writer),
309 ResponseKind::Metadata(metadata) => metadata.encode(writer),
310 ResponseKind::MetadataUpdate(metadata) => metadata.encode(writer),
311 ResponseKind::PublishConfirm(publish_confirm) => publish_confirm.encode(writer),
312 ResponseKind::PublishError(publish_error) => publish_error.encode(writer),
313 ResponseKind::QueryOffset(query_offset) => query_offset.encode(writer),
314 ResponseKind::QueryPublisherSequence(query_publisher) => {
315 query_publisher.encode(writer)
316 }
317 ResponseKind::Credit(credit) => credit.encode(writer),
318 ResponseKind::ExchangeCommandVersions(exchange_command_versions) => {
319 exchange_command_versions.encode(writer)
320 }
321 ResponseKind::SuperStreamPartitions(super_stream_command_versions) => {
322 super_stream_command_versions.encode(writer)
323 }
324 ResponseKind::SuperStreamRoute(super_stream_command_versions) => {
325 super_stream_command_versions.encode(writer)
326 }
327 ResponseKind::ConsumerUpdate(consumer_update_command_version) => {
328 consumer_update_command_version.encode(writer)
329 }
330 ResponseKind::ConsumerUpdateRequest(consumer_update_request_command_version) => {
331 consumer_update_request_command_version.encode(writer)
332 }
333 }
334 }
335 }
336
337 impl Encoder for Response {
338 fn encoded_size(&self) -> u32 {
339 self.header.encoded_size() + 2 + self.kind.encoded_size()
340 }
341
342 fn encode(
343 &self,
344 writer: &mut impl std::io::Write,
345 ) -> Result<(), crate::error::EncodeError> {
346 writer.write_u32::<BigEndian>(self.encoded_size())?;
347 self.header.encode(writer)?;
348 self.kind.encode(writer)?;
349 Ok(())
350 }
351 }
352
353 macro_rules! response_test {
354 ($ty:ty, $variant:path, $cmd:expr) => {
355 use fake::{Fake, Faker};
356 let payload: $ty = Faker.fake();
357
358 let response = Response {
359 header: Header::new($cmd, PROTOCOL_VERSION),
360 kind: $variant(payload),
361 };
362
363 let mut buffer = vec![];
364
365 response.encode(&mut buffer).unwrap();
366
367 let (remaining, decoded) = Response::decode(&buffer).unwrap();
368
369 assert_eq!(response, decoded);
370
371 assert!(remaining.is_empty());
372 };
373 }
374
375 macro_rules! response_payload_test {
376 ($payload:expr, $variant:path, $cmd:expr) => {
377 let response = Response {
378 header: Header::new($cmd, PROTOCOL_VERSION),
379 kind: $variant($payload),
380 };
381
382 let mut buffer = vec![];
383
384 response.encode(&mut buffer).unwrap();
385
386 let (remaining, decoded) = Response::decode(&buffer).unwrap();
387
388 assert_eq!(response, decoded);
389
390 assert!(remaining.is_empty());
391 };
392 }
393
394 #[test]
395 fn open_response_ok_test() {
396 use {fake::Fake, fake::Faker};
397 let mut response: OpenResponse = Faker.fake();
398 response.code = ResponseCode::Ok;
399 response_payload_test!(response, ResponseKind::Open, COMMAND_OPEN);
400 }
401
402 #[test]
403 fn peer_properties_response_test() {
404 response_test!(
405 PeerPropertiesResponse,
406 ResponseKind::PeerProperties,
407 COMMAND_PEER_PROPERTIES
408 );
409 }
410
411 #[test]
412 fn sasl_handshake_response_test() {
413 response_test!(
414 SaslHandshakeResponse,
415 ResponseKind::SaslHandshake,
416 COMMAND_SASL_HANDSHAKE
417 );
418 }
419
420 #[test]
421 fn generic_response_test() {
422 response_test!(
423 GenericResponse,
424 ResponseKind::Generic,
425 COMMAND_SASL_AUTHENTICATE
426 );
427 }
428
429 #[test]
430 fn tune_response_test() {
431 response_test!(TunesCommand, ResponseKind::Tunes, COMMAND_TUNE);
432 }
433 #[test]
434 fn metadata_response_test() {
435 response_test!(MetadataResponse, ResponseKind::Metadata, COMMAND_METADATA);
436 }
437 #[test]
438 fn close_response_test() {
439 response_test!(CloseResponse, ResponseKind::Close, COMMAND_CLOSE);
440 }
441 #[test]
442 fn deliver_response_test() {
443 response_test!(DeliverCommand, ResponseKind::Deliver, COMMAND_DELIVER);
444 }
445 #[test]
446 fn metadata_update_response_test() {
447 response_test!(
448 MetadataUpdateCommand,
449 ResponseKind::MetadataUpdate,
450 COMMAND_METADATA_UPDATE
451 );
452 }
453 #[test]
454 fn publish_confirm_response_test() {
455 response_test!(
456 PublishConfirm,
457 ResponseKind::PublishConfirm,
458 COMMAND_PUBLISH_CONFIRM
459 );
460 }
461 #[test]
462 fn publish_error_response_test() {
463 response_test!(
464 PublishErrorResponse,
465 ResponseKind::PublishError,
466 COMMAND_PUBLISH_ERROR
467 );
468 }
469 #[test]
470 fn query_offset_response_test() {
471 response_test!(
472 QueryOffsetResponse,
473 ResponseKind::QueryOffset,
474 COMMAND_QUERY_OFFSET
475 );
476 }
477 #[test]
478 fn query_publisher_response_test() {
479 response_test!(
480 QueryPublisherResponse,
481 ResponseKind::QueryPublisherSequence,
482 COMMAND_QUERY_PUBLISHER_SEQUENCE
483 );
484 }
485 #[test]
486 fn heartbeat_response_test() {
487 response_test!(
488 HeartbeatResponse,
489 ResponseKind::Heartbeat,
490 COMMAND_HEARTBEAT
491 );
492 }
493
494 #[test]
495 fn exchange_command_versions_response_test() {
496 response_test!(
497 ExchangeCommandVersionsResponse,
498 ResponseKind::ExchangeCommandVersions,
499 COMMAND_EXCHANGE_COMMAND_VERSIONS
500 );
501 }
502
503 #[test]
504 fn super_stream_partitions_response_test() {
505 response_test!(
506 SuperStreamPartitionsResponse,
507 ResponseKind::SuperStreamPartitions,
508 COMMAND_PARTITIONS
509 );
510 }
511
512 #[test]
513 fn super_stream_route_response_test() {
514 response_test!(
515 SuperStreamRouteResponse,
516 ResponseKind::SuperStreamRoute,
517 COMMAND_ROUTE
518 );
519 }
520
521 #[test]
522 fn consumer_update_response_test() {
523 response_test!(
524 ConsumerUpdateCommand,
525 ResponseKind::ConsumerUpdate,
526 COMMAND_CONSUMER_UPDATE
527 );
528 }
529}