rabbitmq_stream_protocol/response/
mod.rs

1use std::convert::TryInto;
2
3use crate::{
4    codec::{
5        decoder::{read_u16, read_u32},
6        Decoder,
7    },
8    commands::{
9        close::CloseResponse, consumer_update::ConsumerUpdateCommand,
10        consumer_update_request::ConsumerUpdateRequestCommand, credit::CreditResponse,
11        deliver::DeliverCommand, exchange_command_versions::ExchangeCommandVersionsResponse,
12        generic::GenericResponse, heart_beat::HeartbeatResponse, metadata::MetadataResponse,
13        metadata_update::MetadataUpdateCommand, open::OpenResponse,
14        peer_properties::PeerPropertiesResponse, publish_confirm::PublishConfirm,
15        publish_error::PublishErrorResponse, query_offset::QueryOffsetResponse,
16        query_publisher_sequence::QueryPublisherResponse, sasl_handshake::SaslHandshakeResponse,
17        superstream_partitions::SuperStreamPartitionsResponse,
18        superstream_route::SuperStreamRouteResponse, tune::TunesCommand,
19    },
20    error::DecodeError,
21    protocol::commands::*,
22    types::Header,
23};
24
25mod shims;
26
27#[cfg_attr(test, derive(fake::Dummy))]
28#[derive(Debug, PartialEq, Eq, Clone)]
29pub enum ResponseCode {
30    Ok,
31    StreamDoesNotExist,
32    SubscriptionIdAlreadyExists,
33    SubscriptionIdDoesNotExist,
34    StreamAlreadyExists,
35    StreamNotAvailable,
36    SaslMechanismNotSupported,
37    AuthenticationFailure,
38    SaslError,
39    SaslChallange,
40    AuthenticationFailureLoopback,
41    VirtualHostAccessFailure,
42    UnknownFrame,
43    FrameTooLarge,
44    InternalError,
45    AccessRefused,
46    PrecoditionFailed,
47    PublisherDoesNotExist,
48    OffsetNotFound,
49}
50#[derive(Debug, PartialEq, Eq)]
51pub struct Response {
52    header: Header,
53    pub(crate) kind: ResponseKind,
54}
55
56#[derive(Debug, PartialEq, Eq)]
57pub enum ResponseKind {
58    Open(OpenResponse),
59    Close(CloseResponse),
60    PeerProperties(PeerPropertiesResponse),
61    SaslHandshake(SaslHandshakeResponse),
62    Generic(GenericResponse),
63    Tunes(TunesCommand),
64    Deliver(DeliverCommand),
65    Heartbeat(HeartbeatResponse),
66    Metadata(MetadataResponse),
67    MetadataUpdate(MetadataUpdateCommand),
68    PublishConfirm(PublishConfirm),
69    PublishError(PublishErrorResponse),
70    QueryOffset(QueryOffsetResponse),
71    QueryPublisherSequence(QueryPublisherResponse),
72    Credit(CreditResponse),
73    ExchangeCommandVersions(ExchangeCommandVersionsResponse),
74    SuperStreamPartitions(SuperStreamPartitionsResponse),
75    SuperStreamRoute(SuperStreamRouteResponse),
76    ConsumerUpdate(ConsumerUpdateCommand),
77    ConsumerUpdateRequest(ConsumerUpdateRequestCommand),
78}
79
80impl Response {
81    pub fn new(header: Header, kind: ResponseKind) -> Self {
82        Self { header, kind }
83    }
84
85    pub fn correlation_id(&self) -> Option<u32> {
86        match &self.kind {
87            ResponseKind::Open(open) => Some(open.correlation_id),
88            ResponseKind::Close(close) => Some(close.correlation_id),
89            ResponseKind::PeerProperties(peer_properties) => Some(peer_properties.correlation_id),
90            ResponseKind::SaslHandshake(handshake) => Some(handshake.correlation_id),
91            ResponseKind::Generic(generic) => Some(generic.correlation_id),
92            ResponseKind::Metadata(metadata) => Some(metadata.correlation_id),
93            ResponseKind::QueryOffset(query_offset) => Some(query_offset.correlation_id),
94            ResponseKind::QueryPublisherSequence(query_publisher) => {
95                Some(query_publisher.correlation_id)
96            }
97            ResponseKind::MetadataUpdate(_) => None,
98            ResponseKind::PublishConfirm(_) => None,
99            ResponseKind::PublishError(_) => None,
100            ResponseKind::Tunes(_) => None,
101            ResponseKind::Heartbeat(_) => None,
102            ResponseKind::Deliver(_) => None,
103            ResponseKind::Credit(_) => None,
104            ResponseKind::ExchangeCommandVersions(exchange_command_versions) => {
105                Some(exchange_command_versions.correlation_id)
106            }
107            ResponseKind::SuperStreamPartitions(super_stream_partitions_command) => {
108                Some(super_stream_partitions_command.correlation_id)
109            }
110            ResponseKind::SuperStreamRoute(super_stream_route_command) => {
111                Some(super_stream_route_command.correlation_id)
112            }
113            ResponseKind::ConsumerUpdate(consumer_update_command) => {
114                Some(consumer_update_command.correlation_id)
115            }
116            ResponseKind::ConsumerUpdateRequest(consumer_update_request_command) => {
117                Some(consumer_update_request_command.correlation_id)
118            }
119        }
120    }
121
122    pub fn get<T>(self) -> Option<T>
123    where
124        T: FromResponse,
125    {
126        T::from_response(self)
127    }
128
129    pub fn kind_ref(&self) -> &ResponseKind {
130        &self.kind
131    }
132    pub fn kind(self) -> ResponseKind {
133        self.kind
134    }
135}
136
137impl Decoder for Response {
138    fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
139        let (input, _) = read_u32(input)?;
140
141        let (input, header) = Header::decode(input)?;
142        let (input, kind) = match header.key() {
143            COMMAND_OPEN => {
144                OpenResponse::decode(input).map(|(i, kind)| (i, ResponseKind::Open(kind)))?
145            }
146
147            COMMAND_CLOSE => {
148                CloseResponse::decode(input).map(|(i, kind)| (i, ResponseKind::Close(kind)))?
149            }
150            COMMAND_PEER_PROPERTIES => PeerPropertiesResponse::decode(input)
151                .map(|(i, kind)| (i, ResponseKind::PeerProperties(kind)))?,
152            COMMAND_SASL_HANDSHAKE => SaslHandshakeResponse::decode(input)
153                .map(|(i, kind)| (i, ResponseKind::SaslHandshake(kind)))?,
154
155            COMMAND_DECLARE_PUBLISHER
156            | COMMAND_DELETE_PUBLISHER
157            | COMMAND_SASL_AUTHENTICATE
158            | COMMAND_SUBSCRIBE
159            | COMMAND_UNSUBSCRIBE
160            | COMMAND_CREATE_STREAM
161            | COMMAND_CREATE_SUPER_STREAM
162            | COMMAND_DELETE_SUPER_STREAM
163            | COMMAND_CONSUMER_UPDATE_REQUEST
164            | COMMAND_DELETE_STREAM => {
165                GenericResponse::decode(input).map(|(i, kind)| (i, ResponseKind::Generic(kind)))?
166            }
167            COMMAND_TUNE => {
168                TunesCommand::decode(input).map(|(i, kind)| (i, ResponseKind::Tunes(kind)))?
169            }
170            COMMAND_DELIVER => DeliverCommand::decode(input)
171                .map(|(remaining, kind)| (remaining, ResponseKind::Deliver(kind)))?,
172
173            COMMAND_HEARTBEAT => HeartbeatResponse::decode(input)
174                .map(|(remaining, kind)| (remaining, ResponseKind::Heartbeat(kind)))?,
175            COMMAND_METADATA => MetadataResponse::decode(input)
176                .map(|(remaining, kind)| (remaining, ResponseKind::Metadata(kind)))?,
177            COMMAND_METADATA_UPDATE => MetadataUpdateCommand::decode(input)
178                .map(|(remaining, kind)| (remaining, ResponseKind::MetadataUpdate(kind)))?,
179            COMMAND_PUBLISH_CONFIRM => PublishConfirm::decode(input)
180                .map(|(remaining, kind)| (remaining, ResponseKind::PublishConfirm(kind)))?,
181
182            COMMAND_PUBLISH_ERROR => PublishErrorResponse::decode(input)
183                .map(|(remaining, kind)| (remaining, ResponseKind::PublishError(kind)))?,
184
185            COMMAND_QUERY_OFFSET => QueryOffsetResponse::decode(input)
186                .map(|(remaining, kind)| (remaining, ResponseKind::QueryOffset(kind)))?,
187
188            COMMAND_CREDIT => CreditResponse::decode(input)
189                .map(|(remaining, kind)| (remaining, ResponseKind::Credit(kind)))?,
190
191            COMMAND_QUERY_PUBLISHER_SEQUENCE => QueryPublisherResponse::decode(input)
192                .map(|(remaining, kind)| (remaining, ResponseKind::QueryPublisherSequence(kind)))?,
193            COMMAND_EXCHANGE_COMMAND_VERSIONS => ExchangeCommandVersionsResponse::decode(input)
194                .map(|(remaining, kind)| {
195                    (remaining, ResponseKind::ExchangeCommandVersions(kind))
196                })?,
197            COMMAND_PARTITIONS => SuperStreamPartitionsResponse::decode(input)
198                .map(|(remaining, kind)| (remaining, ResponseKind::SuperStreamPartitions(kind)))?,
199            COMMAND_ROUTE => SuperStreamRouteResponse::decode(input)
200                .map(|(remaining, kind)| (remaining, ResponseKind::SuperStreamRoute(kind)))?,
201            COMMAND_CONSUMER_UPDATE => ConsumerUpdateCommand::decode(input)
202                .map(|(remaining, kind)| (remaining, ResponseKind::ConsumerUpdate(kind)))?,
203            n => return Err(DecodeError::UnsupportedResponseType(n)),
204        };
205        Ok((input, Response { header, kind }))
206    }
207}
208
209impl Decoder for ResponseCode {
210    fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
211        let (input, code) = read_u16(input)?;
212        Ok((input, code.try_into()?))
213    }
214}
215
216pub trait FromResponse
217where
218    Self: Sized,
219{
220    fn from_response(response: Response) -> Option<Self>;
221}
222
223#[cfg(test)]
224mod tests {
225
226    use std::collections::HashMap;
227
228    use byteorder::{BigEndian, WriteBytesExt};
229
230    use super::{Response, ResponseKind};
231    use crate::{
232        codec::{Decoder, Encoder},
233        commands::{
234            close::CloseResponse, consumer_update::ConsumerUpdateCommand,
235            consumer_update_request::ConsumerUpdateRequestCommand, deliver::DeliverCommand,
236            exchange_command_versions::ExchangeCommandVersionsResponse, generic::GenericResponse,
237            heart_beat::HeartbeatResponse, metadata::MetadataResponse,
238            metadata_update::MetadataUpdateCommand, open::OpenResponse,
239            peer_properties::PeerPropertiesResponse, publish_confirm::PublishConfirm,
240            publish_error::PublishErrorResponse, query_offset::QueryOffsetResponse,
241            query_publisher_sequence::QueryPublisherResponse,
242            sasl_handshake::SaslHandshakeResponse,
243            superstream_partitions::SuperStreamPartitionsResponse,
244            superstream_route::SuperStreamRouteResponse, tune::TunesCommand,
245        },
246        protocol::{
247            commands::{
248                COMMAND_CLOSE, COMMAND_CONSUMER_UPDATE, COMMAND_CONSUMER_UPDATE_REQUEST,
249                COMMAND_DELIVER, COMMAND_HEARTBEAT, COMMAND_METADATA, COMMAND_METADATA_UPDATE,
250                COMMAND_OPEN, COMMAND_PARTITIONS, COMMAND_PEER_PROPERTIES, COMMAND_PUBLISH_CONFIRM,
251                COMMAND_PUBLISH_ERROR, COMMAND_QUERY_OFFSET, COMMAND_QUERY_PUBLISHER_SEQUENCE,
252                COMMAND_ROUTE, COMMAND_SASL_AUTHENTICATE, COMMAND_SASL_HANDSHAKE, COMMAND_TUNE,
253            },
254            version::PROTOCOL_VERSION,
255        },
256        response::COMMAND_EXCHANGE_COMMAND_VERSIONS,
257        types::Header,
258        ResponseCode,
259    };
260
261    impl Encoder for ResponseKind {
262        fn encoded_size(&self) -> u32 {
263            match self {
264                ResponseKind::Open(open) => open.encoded_size(),
265                ResponseKind::Close(close) => close.encoded_size(),
266                ResponseKind::PeerProperties(peer_properties) => peer_properties.encoded_size(),
267                ResponseKind::SaslHandshake(handshake) => handshake.encoded_size(),
268                ResponseKind::Generic(generic) => generic.encoded_size(),
269                ResponseKind::Tunes(tune) => tune.encoded_size(),
270                ResponseKind::Heartbeat(heartbeat) => heartbeat.encoded_size(),
271                ResponseKind::Deliver(deliver) => deliver.encoded_size(),
272                ResponseKind::Metadata(metadata) => metadata.encoded_size(),
273                ResponseKind::MetadataUpdate(metadata) => metadata.encoded_size(),
274                ResponseKind::PublishConfirm(publish_confirm) => publish_confirm.encoded_size(),
275                ResponseKind::PublishError(publish_error) => publish_error.encoded_size(),
276                ResponseKind::QueryOffset(query_offset) => query_offset.encoded_size(),
277                ResponseKind::QueryPublisherSequence(query_publisher) => {
278                    query_publisher.encoded_size()
279                }
280                ResponseKind::Credit(credit) => credit.encoded_size(),
281                ResponseKind::ExchangeCommandVersions(exchange_command_versions) => {
282                    exchange_command_versions.encoded_size()
283                }
284                ResponseKind::SuperStreamPartitions(super_stream_response) => {
285                    super_stream_response.encoded_size()
286                }
287                ResponseKind::SuperStreamRoute(super_stream_response) => {
288                    super_stream_response.encoded_size()
289                }
290                ResponseKind::ConsumerUpdate(consumer_update_response) => {
291                    consumer_update_response.encoded_size()
292                }
293                ResponseKind::ConsumerUpdateRequest(consumer_update_request_response) => {
294                    consumer_update_request_response.encoded_size()
295                }
296            }
297        }
298
299        fn encode(
300            &self,
301            writer: &mut impl std::io::Write,
302        ) -> Result<(), crate::error::EncodeError> {
303            match self {
304                ResponseKind::Open(open) => open.encode(writer),
305                ResponseKind::Close(close) => close.encode(writer),
306                ResponseKind::PeerProperties(peer_properties) => peer_properties.encode(writer),
307                ResponseKind::SaslHandshake(handshake) => handshake.encode(writer),
308                ResponseKind::Generic(generic) => generic.encode(writer),
309                ResponseKind::Tunes(tune) => tune.encode(writer),
310                ResponseKind::Heartbeat(heartbeat) => heartbeat.encode(writer),
311                ResponseKind::Deliver(deliver) => deliver.encode(writer),
312                ResponseKind::Metadata(metadata) => metadata.encode(writer),
313                ResponseKind::MetadataUpdate(metadata) => metadata.encode(writer),
314                ResponseKind::PublishConfirm(publish_confirm) => publish_confirm.encode(writer),
315                ResponseKind::PublishError(publish_error) => publish_error.encode(writer),
316                ResponseKind::QueryOffset(query_offset) => query_offset.encode(writer),
317                ResponseKind::QueryPublisherSequence(query_publisher) => {
318                    query_publisher.encode(writer)
319                }
320                ResponseKind::Credit(credit) => credit.encode(writer),
321                ResponseKind::ExchangeCommandVersions(exchange_command_versions) => {
322                    exchange_command_versions.encode(writer)
323                }
324                ResponseKind::SuperStreamPartitions(super_stream_command_versions) => {
325                    super_stream_command_versions.encode(writer)
326                }
327                ResponseKind::SuperStreamRoute(super_stream_command_versions) => {
328                    super_stream_command_versions.encode(writer)
329                }
330                ResponseKind::ConsumerUpdate(consumer_update_command_version) => {
331                    consumer_update_command_version.encode(writer)
332                }
333                ResponseKind::ConsumerUpdateRequest(consumer_update_request_command_version) => {
334                    consumer_update_request_command_version.encode(writer)
335                }
336            }
337        }
338    }
339
340    impl Encoder for Response {
341        fn encoded_size(&self) -> u32 {
342            self.header.encoded_size() + 2 + self.kind.encoded_size()
343        }
344
345        fn encode(
346            &self,
347            writer: &mut impl std::io::Write,
348        ) -> Result<(), crate::error::EncodeError> {
349            writer.write_u32::<BigEndian>(self.encoded_size())?;
350            self.header.encode(writer)?;
351            self.kind.encode(writer)?;
352            Ok(())
353        }
354    }
355
356    macro_rules! response_test {
357        ($ty:ty, $variant:path, $cmd:expr) => {
358            use fake::{Fake, Faker};
359            let payload: $ty = Faker.fake();
360
361            let response = Response {
362                header: Header::new($cmd, PROTOCOL_VERSION),
363                kind: $variant(payload),
364            };
365
366            let mut buffer = vec![];
367
368            response.encode(&mut buffer).unwrap();
369
370            let (remaining, decoded) = Response::decode(&buffer).unwrap();
371
372            assert_eq!(response, decoded);
373
374            assert!(remaining.is_empty());
375        };
376    }
377
378    macro_rules! response_payload_test {
379        ($payload:expr, $variant:path, $cmd:expr) => {
380            let response = Response {
381                header: Header::new($cmd, PROTOCOL_VERSION),
382                kind: $variant($payload),
383            };
384
385            let mut buffer = vec![];
386
387            response.encode(&mut buffer).unwrap();
388
389            let (remaining, decoded) = Response::decode(&buffer).unwrap();
390
391            assert_eq!(response, decoded);
392
393            assert!(remaining.is_empty());
394        };
395    }
396
397    #[test]
398    fn open_response_ok_test() {
399        use {fake::Fake, fake::Faker};
400        let mut response: OpenResponse = Faker.fake();
401        response.code = ResponseCode::Ok;
402        response_payload_test!(response, ResponseKind::Open, COMMAND_OPEN);
403    }
404
405    #[test]
406    fn peer_properties_response_test() {
407        response_test!(
408            PeerPropertiesResponse,
409            ResponseKind::PeerProperties,
410            COMMAND_PEER_PROPERTIES
411        );
412    }
413
414    #[test]
415    fn sasl_handshake_response_test() {
416        response_test!(
417            SaslHandshakeResponse,
418            ResponseKind::SaslHandshake,
419            COMMAND_SASL_HANDSHAKE
420        );
421    }
422
423    #[test]
424    fn generic_response_test() {
425        response_test!(
426            GenericResponse,
427            ResponseKind::Generic,
428            COMMAND_SASL_AUTHENTICATE
429        );
430    }
431
432    #[test]
433    fn tune_response_test() {
434        response_test!(TunesCommand, ResponseKind::Tunes, COMMAND_TUNE);
435    }
436    #[test]
437    fn metadata_response_test() {
438        response_test!(MetadataResponse, ResponseKind::Metadata, COMMAND_METADATA);
439    }
440    #[test]
441    fn close_response_test() {
442        response_test!(CloseResponse, ResponseKind::Close, COMMAND_CLOSE);
443    }
444    #[test]
445    fn deliver_response_test() {
446        response_test!(DeliverCommand, ResponseKind::Deliver, COMMAND_DELIVER);
447    }
448    #[test]
449    fn metadata_update_response_test() {
450        response_test!(
451            MetadataUpdateCommand,
452            ResponseKind::MetadataUpdate,
453            COMMAND_METADATA_UPDATE
454        );
455    }
456    #[test]
457    fn publish_confirm_response_test() {
458        response_test!(
459            PublishConfirm,
460            ResponseKind::PublishConfirm,
461            COMMAND_PUBLISH_CONFIRM
462        );
463    }
464    #[test]
465    fn publish_error_response_test() {
466        response_test!(
467            PublishErrorResponse,
468            ResponseKind::PublishError,
469            COMMAND_PUBLISH_ERROR
470        );
471    }
472    #[test]
473    fn query_offset_response_test() {
474        response_test!(
475            QueryOffsetResponse,
476            ResponseKind::QueryOffset,
477            COMMAND_QUERY_OFFSET
478        );
479    }
480    #[test]
481    fn query_publisher_response_test() {
482        response_test!(
483            QueryPublisherResponse,
484            ResponseKind::QueryPublisherSequence,
485            COMMAND_QUERY_PUBLISHER_SEQUENCE
486        );
487    }
488    #[test]
489    fn heartbeat_response_test() {
490        response_test!(
491            HeartbeatResponse,
492            ResponseKind::Heartbeat,
493            COMMAND_HEARTBEAT
494        );
495    }
496
497    #[test]
498    fn exchange_command_versions_response_test() {
499        response_test!(
500            ExchangeCommandVersionsResponse,
501            ResponseKind::ExchangeCommandVersions,
502            COMMAND_EXCHANGE_COMMAND_VERSIONS
503        );
504    }
505
506    #[test]
507    fn super_stream_partitions_response_test() {
508        response_test!(
509            SuperStreamPartitionsResponse,
510            ResponseKind::SuperStreamPartitions,
511            COMMAND_PARTITIONS
512        );
513    }
514
515    #[test]
516    fn super_stream_route_response_test() {
517        response_test!(
518            SuperStreamRouteResponse,
519            ResponseKind::SuperStreamRoute,
520            COMMAND_ROUTE
521        );
522    }
523
524    #[test]
525    fn consumer_update_response_test() {
526        response_test!(
527            ConsumerUpdateCommand,
528            ResponseKind::ConsumerUpdate,
529            COMMAND_CONSUMER_UPDATE
530        );
531    }
532}