rabbitmq_stream_protocol/response/
mod.rs

1use std::convert::TryInto;
2
3use crate::{
4    codec::{
5        decoder::{read_u16, read_u32},
6        Decoder,
7    },
8    commands::{
9        close::CloseResponse, consumer_update::ConsumerUpdateCommand,
10        consumer_update_request::ConsumerUpdateRequestCommand, credit::CreditResponse,
11        deliver::DeliverCommand, exchange_command_versions::ExchangeCommandVersionsResponse,
12        generic::GenericResponse, heart_beat::HeartbeatResponse, metadata::MetadataResponse,
13        metadata_update::MetadataUpdateCommand, open::OpenResponse,
14        peer_properties::PeerPropertiesResponse, publish_confirm::PublishConfirm,
15        publish_error::PublishErrorResponse, query_offset::QueryOffsetResponse,
16        query_publisher_sequence::QueryPublisherResponse, sasl_handshake::SaslHandshakeResponse,
17        superstream_partitions::SuperStreamPartitionsResponse,
18        superstream_route::SuperStreamRouteResponse, tune::TunesCommand,
19    },
20    error::DecodeError,
21    protocol::commands::*,
22    types::Header,
23};
24
25mod shims;
26
27#[cfg_attr(test, derive(fake::Dummy))]
28#[derive(Debug, PartialEq, Eq, Clone)]
29pub enum ResponseCode {
30    Ok,
31    StreamDoesNotExist,
32    SubscriptionIdAlreadyExists,
33    SubscriptionIdDoesNotExist,
34    StreamAlreadyExists,
35    StreamNotAvailable,
36    SaslMechanismNotSupported,
37    AuthenticationFailure,
38    SaslError,
39    SaslChallange,
40    AuthenticationFailureLoopback,
41    VirtualHostAccessFailure,
42    UnknownFrame,
43    FrameTooLarge,
44    InternalError,
45    AccessRefused,
46    PreconditionFailed,
47    PublisherDoesNotExist,
48    OffsetNotFound,
49}
50#[derive(Debug, PartialEq, Eq)]
51pub struct Response {
52    header: Header,
53    pub(crate) kind: ResponseKind,
54}
55
56#[derive(Debug, PartialEq, Eq)]
57pub enum ResponseKind {
58    Open(OpenResponse),
59    Close(CloseResponse),
60    PeerProperties(PeerPropertiesResponse),
61    SaslHandshake(SaslHandshakeResponse),
62    Generic(GenericResponse),
63    Tunes(TunesCommand),
64    Deliver(DeliverCommand),
65    Heartbeat(HeartbeatResponse),
66    Metadata(MetadataResponse),
67    MetadataUpdate(MetadataUpdateCommand),
68    PublishConfirm(PublishConfirm),
69    PublishError(PublishErrorResponse),
70    QueryOffset(QueryOffsetResponse),
71    QueryPublisherSequence(QueryPublisherResponse),
72    Credit(CreditResponse),
73    ExchangeCommandVersions(ExchangeCommandVersionsResponse),
74    SuperStreamPartitions(SuperStreamPartitionsResponse),
75    SuperStreamRoute(SuperStreamRouteResponse),
76    ConsumerUpdate(ConsumerUpdateCommand),
77    ConsumerUpdateRequest(ConsumerUpdateRequestCommand),
78}
79
80impl Response {
81    pub fn new(header: Header, kind: ResponseKind) -> Self {
82        Self { header, kind }
83    }
84
85    pub fn correlation_id(&self) -> Option<u32> {
86        match &self.kind {
87            ResponseKind::Open(open) => Some(open.correlation_id),
88            ResponseKind::Close(close) => Some(close.correlation_id),
89            ResponseKind::PeerProperties(peer_properties) => Some(peer_properties.correlation_id),
90            ResponseKind::SaslHandshake(handshake) => Some(handshake.correlation_id),
91            ResponseKind::Generic(generic) => Some(generic.correlation_id),
92            ResponseKind::Metadata(metadata) => Some(metadata.correlation_id),
93            ResponseKind::QueryOffset(query_offset) => Some(query_offset.correlation_id),
94            ResponseKind::QueryPublisherSequence(query_publisher) => {
95                Some(query_publisher.correlation_id)
96            }
97            ResponseKind::MetadataUpdate(_) => None,
98            ResponseKind::PublishConfirm(_) => None,
99            ResponseKind::PublishError(_) => None,
100            ResponseKind::Tunes(_) => None,
101            ResponseKind::Heartbeat(_) => None,
102            ResponseKind::Deliver(_) => None,
103            ResponseKind::Credit(_) => None,
104            ResponseKind::ExchangeCommandVersions(exchange_command_versions) => {
105                Some(exchange_command_versions.correlation_id)
106            }
107            ResponseKind::SuperStreamPartitions(super_stream_partitions_command) => {
108                Some(super_stream_partitions_command.correlation_id)
109            }
110            ResponseKind::SuperStreamRoute(super_stream_route_command) => {
111                Some(super_stream_route_command.correlation_id)
112            }
113            ResponseKind::ConsumerUpdate(consumer_update_command) => {
114                Some(consumer_update_command.correlation_id)
115            }
116            ResponseKind::ConsumerUpdateRequest(consumer_update_request_command) => {
117                Some(consumer_update_request_command.correlation_id)
118            }
119        }
120    }
121
122    pub fn get<T>(self) -> Option<T>
123    where
124        T: FromResponse,
125    {
126        T::from_response(self)
127    }
128
129    pub fn kind_ref(&self) -> &ResponseKind {
130        &self.kind
131    }
132    pub fn kind(self) -> ResponseKind {
133        self.kind
134    }
135}
136
137impl Decoder for Response {
138    fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
139        let (input, _) = read_u32(input)?;
140
141        let (input, header) = Header::decode(input)?;
142        let (input, kind) = match header.key() {
143            COMMAND_OPEN => {
144                OpenResponse::decode(input).map(|(i, kind)| (i, ResponseKind::Open(kind)))?
145            }
146
147            COMMAND_CLOSE => {
148                CloseResponse::decode(input).map(|(i, kind)| (i, ResponseKind::Close(kind)))?
149            }
150            COMMAND_PEER_PROPERTIES => PeerPropertiesResponse::decode(input)
151                .map(|(i, kind)| (i, ResponseKind::PeerProperties(kind)))?,
152            COMMAND_SASL_HANDSHAKE => SaslHandshakeResponse::decode(input)
153                .map(|(i, kind)| (i, ResponseKind::SaslHandshake(kind)))?,
154
155            COMMAND_DECLARE_PUBLISHER
156            | COMMAND_DELETE_PUBLISHER
157            | COMMAND_SASL_AUTHENTICATE
158            | COMMAND_SUBSCRIBE
159            | COMMAND_UNSUBSCRIBE
160            | COMMAND_CREATE_STREAM
161            | COMMAND_CREATE_SUPER_STREAM
162            | COMMAND_DELETE_SUPER_STREAM
163            | COMMAND_CONSUMER_UPDATE_REQUEST
164            | COMMAND_DELETE_STREAM => {
165                GenericResponse::decode(input).map(|(i, kind)| (i, ResponseKind::Generic(kind)))?
166            }
167            COMMAND_TUNE => {
168                TunesCommand::decode(input).map(|(i, kind)| (i, ResponseKind::Tunes(kind)))?
169            }
170            COMMAND_DELIVER => DeliverCommand::decode(input)
171                .map(|(remaining, kind)| (remaining, ResponseKind::Deliver(kind)))?,
172
173            COMMAND_HEARTBEAT => HeartbeatResponse::decode(input)
174                .map(|(remaining, kind)| (remaining, ResponseKind::Heartbeat(kind)))?,
175            COMMAND_METADATA => MetadataResponse::decode(input)
176                .map(|(remaining, kind)| (remaining, ResponseKind::Metadata(kind)))?,
177            COMMAND_METADATA_UPDATE => MetadataUpdateCommand::decode(input)
178                .map(|(remaining, kind)| (remaining, ResponseKind::MetadataUpdate(kind)))?,
179            COMMAND_PUBLISH_CONFIRM => PublishConfirm::decode(input)
180                .map(|(remaining, kind)| (remaining, ResponseKind::PublishConfirm(kind)))?,
181
182            COMMAND_PUBLISH_ERROR => PublishErrorResponse::decode(input)
183                .map(|(remaining, kind)| (remaining, ResponseKind::PublishError(kind)))?,
184
185            COMMAND_QUERY_OFFSET => QueryOffsetResponse::decode(input)
186                .map(|(remaining, kind)| (remaining, ResponseKind::QueryOffset(kind)))?,
187
188            COMMAND_CREDIT => CreditResponse::decode(input)
189                .map(|(remaining, kind)| (remaining, ResponseKind::Credit(kind)))?,
190
191            COMMAND_QUERY_PUBLISHER_SEQUENCE => QueryPublisherResponse::decode(input)
192                .map(|(remaining, kind)| (remaining, ResponseKind::QueryPublisherSequence(kind)))?,
193            COMMAND_EXCHANGE_COMMAND_VERSIONS => ExchangeCommandVersionsResponse::decode(input)
194                .map(|(remaining, kind)| {
195                    (remaining, ResponseKind::ExchangeCommandVersions(kind))
196                })?,
197            COMMAND_PARTITIONS => SuperStreamPartitionsResponse::decode(input)
198                .map(|(remaining, kind)| (remaining, ResponseKind::SuperStreamPartitions(kind)))?,
199            COMMAND_ROUTE => SuperStreamRouteResponse::decode(input)
200                .map(|(remaining, kind)| (remaining, ResponseKind::SuperStreamRoute(kind)))?,
201            COMMAND_CONSUMER_UPDATE => ConsumerUpdateCommand::decode(input)
202                .map(|(remaining, kind)| (remaining, ResponseKind::ConsumerUpdate(kind)))?,
203            n => return Err(DecodeError::UnsupportedResponseType(n)),
204        };
205        Ok((input, Response { header, kind }))
206    }
207}
208
209impl Decoder for ResponseCode {
210    fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
211        let (input, code) = read_u16(input)?;
212        Ok((input, code.try_into()?))
213    }
214}
215
216pub trait FromResponse
217where
218    Self: Sized,
219{
220    fn from_response(response: Response) -> Option<Self>;
221}
222
223#[cfg(test)]
224mod tests {
225
226    use byteorder::{BigEndian, WriteBytesExt};
227
228    use super::{Response, ResponseKind};
229    use crate::{
230        codec::{Decoder, Encoder},
231        commands::{
232            close::CloseResponse, consumer_update::ConsumerUpdateCommand, deliver::DeliverCommand,
233            exchange_command_versions::ExchangeCommandVersionsResponse, generic::GenericResponse,
234            heart_beat::HeartbeatResponse, metadata::MetadataResponse,
235            metadata_update::MetadataUpdateCommand, open::OpenResponse,
236            peer_properties::PeerPropertiesResponse, publish_confirm::PublishConfirm,
237            publish_error::PublishErrorResponse, query_offset::QueryOffsetResponse,
238            query_publisher_sequence::QueryPublisherResponse,
239            sasl_handshake::SaslHandshakeResponse,
240            superstream_partitions::SuperStreamPartitionsResponse,
241            superstream_route::SuperStreamRouteResponse, tune::TunesCommand,
242        },
243        protocol::{
244            commands::{
245                COMMAND_CLOSE, COMMAND_CONSUMER_UPDATE, COMMAND_CONSUMER_UPDATE_REQUEST,
246                COMMAND_DELIVER, COMMAND_HEARTBEAT, COMMAND_METADATA, COMMAND_METADATA_UPDATE,
247                COMMAND_OPEN, COMMAND_PARTITIONS, COMMAND_PEER_PROPERTIES, COMMAND_PUBLISH_CONFIRM,
248                COMMAND_PUBLISH_ERROR, COMMAND_QUERY_OFFSET, COMMAND_QUERY_PUBLISHER_SEQUENCE,
249                COMMAND_ROUTE, COMMAND_SASL_AUTHENTICATE, COMMAND_SASL_HANDSHAKE, COMMAND_TUNE,
250            },
251            version::PROTOCOL_VERSION,
252        },
253        response::COMMAND_EXCHANGE_COMMAND_VERSIONS,
254        types::Header,
255        ResponseCode,
256    };
257
258    impl Encoder for ResponseKind {
259        fn encoded_size(&self) -> u32 {
260            match self {
261                ResponseKind::Open(open) => open.encoded_size(),
262                ResponseKind::Close(close) => close.encoded_size(),
263                ResponseKind::PeerProperties(peer_properties) => peer_properties.encoded_size(),
264                ResponseKind::SaslHandshake(handshake) => handshake.encoded_size(),
265                ResponseKind::Generic(generic) => generic.encoded_size(),
266                ResponseKind::Tunes(tune) => tune.encoded_size(),
267                ResponseKind::Heartbeat(heartbeat) => heartbeat.encoded_size(),
268                ResponseKind::Deliver(deliver) => deliver.encoded_size(),
269                ResponseKind::Metadata(metadata) => metadata.encoded_size(),
270                ResponseKind::MetadataUpdate(metadata) => metadata.encoded_size(),
271                ResponseKind::PublishConfirm(publish_confirm) => publish_confirm.encoded_size(),
272                ResponseKind::PublishError(publish_error) => publish_error.encoded_size(),
273                ResponseKind::QueryOffset(query_offset) => query_offset.encoded_size(),
274                ResponseKind::QueryPublisherSequence(query_publisher) => {
275                    query_publisher.encoded_size()
276                }
277                ResponseKind::Credit(credit) => credit.encoded_size(),
278                ResponseKind::ExchangeCommandVersions(exchange_command_versions) => {
279                    exchange_command_versions.encoded_size()
280                }
281                ResponseKind::SuperStreamPartitions(super_stream_response) => {
282                    super_stream_response.encoded_size()
283                }
284                ResponseKind::SuperStreamRoute(super_stream_response) => {
285                    super_stream_response.encoded_size()
286                }
287                ResponseKind::ConsumerUpdate(consumer_update_response) => {
288                    consumer_update_response.encoded_size()
289                }
290                ResponseKind::ConsumerUpdateRequest(consumer_update_request_response) => {
291                    consumer_update_request_response.encoded_size()
292                }
293            }
294        }
295
296        fn encode(
297            &self,
298            writer: &mut impl std::io::Write,
299        ) -> Result<(), crate::error::EncodeError> {
300            match self {
301                ResponseKind::Open(open) => open.encode(writer),
302                ResponseKind::Close(close) => close.encode(writer),
303                ResponseKind::PeerProperties(peer_properties) => peer_properties.encode(writer),
304                ResponseKind::SaslHandshake(handshake) => handshake.encode(writer),
305                ResponseKind::Generic(generic) => generic.encode(writer),
306                ResponseKind::Tunes(tune) => tune.encode(writer),
307                ResponseKind::Heartbeat(heartbeat) => heartbeat.encode(writer),
308                ResponseKind::Deliver(deliver) => deliver.encode(writer),
309                ResponseKind::Metadata(metadata) => metadata.encode(writer),
310                ResponseKind::MetadataUpdate(metadata) => metadata.encode(writer),
311                ResponseKind::PublishConfirm(publish_confirm) => publish_confirm.encode(writer),
312                ResponseKind::PublishError(publish_error) => publish_error.encode(writer),
313                ResponseKind::QueryOffset(query_offset) => query_offset.encode(writer),
314                ResponseKind::QueryPublisherSequence(query_publisher) => {
315                    query_publisher.encode(writer)
316                }
317                ResponseKind::Credit(credit) => credit.encode(writer),
318                ResponseKind::ExchangeCommandVersions(exchange_command_versions) => {
319                    exchange_command_versions.encode(writer)
320                }
321                ResponseKind::SuperStreamPartitions(super_stream_command_versions) => {
322                    super_stream_command_versions.encode(writer)
323                }
324                ResponseKind::SuperStreamRoute(super_stream_command_versions) => {
325                    super_stream_command_versions.encode(writer)
326                }
327                ResponseKind::ConsumerUpdate(consumer_update_command_version) => {
328                    consumer_update_command_version.encode(writer)
329                }
330                ResponseKind::ConsumerUpdateRequest(consumer_update_request_command_version) => {
331                    consumer_update_request_command_version.encode(writer)
332                }
333            }
334        }
335    }
336
337    impl Encoder for Response {
338        fn encoded_size(&self) -> u32 {
339            self.header.encoded_size() + 2 + self.kind.encoded_size()
340        }
341
342        fn encode(
343            &self,
344            writer: &mut impl std::io::Write,
345        ) -> Result<(), crate::error::EncodeError> {
346            writer.write_u32::<BigEndian>(self.encoded_size())?;
347            self.header.encode(writer)?;
348            self.kind.encode(writer)?;
349            Ok(())
350        }
351    }
352
353    macro_rules! response_test {
354        ($ty:ty, $variant:path, $cmd:expr) => {
355            use fake::{Fake, Faker};
356            let payload: $ty = Faker.fake();
357
358            let response = Response {
359                header: Header::new($cmd, PROTOCOL_VERSION),
360                kind: $variant(payload),
361            };
362
363            let mut buffer = vec![];
364
365            response.encode(&mut buffer).unwrap();
366
367            let (remaining, decoded) = Response::decode(&buffer).unwrap();
368
369            assert_eq!(response, decoded);
370
371            assert!(remaining.is_empty());
372        };
373    }
374
375    macro_rules! response_payload_test {
376        ($payload:expr, $variant:path, $cmd:expr) => {
377            let response = Response {
378                header: Header::new($cmd, PROTOCOL_VERSION),
379                kind: $variant($payload),
380            };
381
382            let mut buffer = vec![];
383
384            response.encode(&mut buffer).unwrap();
385
386            let (remaining, decoded) = Response::decode(&buffer).unwrap();
387
388            assert_eq!(response, decoded);
389
390            assert!(remaining.is_empty());
391        };
392    }
393
394    #[test]
395    fn open_response_ok_test() {
396        use {fake::Fake, fake::Faker};
397        let mut response: OpenResponse = Faker.fake();
398        response.code = ResponseCode::Ok;
399        response_payload_test!(response, ResponseKind::Open, COMMAND_OPEN);
400    }
401
402    #[test]
403    fn peer_properties_response_test() {
404        response_test!(
405            PeerPropertiesResponse,
406            ResponseKind::PeerProperties,
407            COMMAND_PEER_PROPERTIES
408        );
409    }
410
411    #[test]
412    fn sasl_handshake_response_test() {
413        response_test!(
414            SaslHandshakeResponse,
415            ResponseKind::SaslHandshake,
416            COMMAND_SASL_HANDSHAKE
417        );
418    }
419
420    #[test]
421    fn generic_response_test() {
422        response_test!(
423            GenericResponse,
424            ResponseKind::Generic,
425            COMMAND_SASL_AUTHENTICATE
426        );
427    }
428
429    #[test]
430    fn tune_response_test() {
431        response_test!(TunesCommand, ResponseKind::Tunes, COMMAND_TUNE);
432    }
433    #[test]
434    fn metadata_response_test() {
435        response_test!(MetadataResponse, ResponseKind::Metadata, COMMAND_METADATA);
436    }
437    #[test]
438    fn close_response_test() {
439        response_test!(CloseResponse, ResponseKind::Close, COMMAND_CLOSE);
440    }
441    #[test]
442    fn deliver_response_test() {
443        response_test!(DeliverCommand, ResponseKind::Deliver, COMMAND_DELIVER);
444    }
445    #[test]
446    fn metadata_update_response_test() {
447        response_test!(
448            MetadataUpdateCommand,
449            ResponseKind::MetadataUpdate,
450            COMMAND_METADATA_UPDATE
451        );
452    }
453    #[test]
454    fn publish_confirm_response_test() {
455        response_test!(
456            PublishConfirm,
457            ResponseKind::PublishConfirm,
458            COMMAND_PUBLISH_CONFIRM
459        );
460    }
461    #[test]
462    fn publish_error_response_test() {
463        response_test!(
464            PublishErrorResponse,
465            ResponseKind::PublishError,
466            COMMAND_PUBLISH_ERROR
467        );
468    }
469    #[test]
470    fn query_offset_response_test() {
471        response_test!(
472            QueryOffsetResponse,
473            ResponseKind::QueryOffset,
474            COMMAND_QUERY_OFFSET
475        );
476    }
477    #[test]
478    fn query_publisher_response_test() {
479        response_test!(
480            QueryPublisherResponse,
481            ResponseKind::QueryPublisherSequence,
482            COMMAND_QUERY_PUBLISHER_SEQUENCE
483        );
484    }
485    #[test]
486    fn heartbeat_response_test() {
487        response_test!(
488            HeartbeatResponse,
489            ResponseKind::Heartbeat,
490            COMMAND_HEARTBEAT
491        );
492    }
493
494    #[test]
495    fn exchange_command_versions_response_test() {
496        response_test!(
497            ExchangeCommandVersionsResponse,
498            ResponseKind::ExchangeCommandVersions,
499            COMMAND_EXCHANGE_COMMAND_VERSIONS
500        );
501    }
502
503    #[test]
504    fn super_stream_partitions_response_test() {
505        response_test!(
506            SuperStreamPartitionsResponse,
507            ResponseKind::SuperStreamPartitions,
508            COMMAND_PARTITIONS
509        );
510    }
511
512    #[test]
513    fn super_stream_route_response_test() {
514        response_test!(
515            SuperStreamRouteResponse,
516            ResponseKind::SuperStreamRoute,
517            COMMAND_ROUTE
518        );
519    }
520
521    #[test]
522    fn consumer_update_response_test() {
523        response_test!(
524            ConsumerUpdateCommand,
525            ResponseKind::ConsumerUpdate,
526            COMMAND_CONSUMER_UPDATE
527        );
528    }
529}