rabbitmq_stream_protocol/response/
mod.rs

1use std::convert::TryInto;
2
3use crate::{
4    codec::{
5        decoder::{read_u16, read_u32},
6        Decoder,
7    },
8    commands::{
9        close::CloseResponse, consumer_update::ConsumerUpdateCommand,
10        consumer_update_request::ConsumerUpdateRequestCommand, credit::CreditResponse,
11        deliver::DeliverCommand, exchange_command_versions::ExchangeCommandVersionsResponse,
12        generic::GenericResponse, heart_beat::HeartbeatResponse, metadata::MetadataResponse,
13        metadata_update::MetadataUpdateCommand, open::OpenResponse,
14        peer_properties::PeerPropertiesResponse, publish_confirm::PublishConfirm,
15        publish_error::PublishErrorResponse, query_offset::QueryOffsetResponse,
16        query_publisher_sequence::QueryPublisherResponse, sasl_handshake::SaslHandshakeResponse,
17        superstream_partitions::SuperStreamPartitionsResponse,
18        superstream_route::SuperStreamRouteResponse, tune::TunesCommand,
19    },
20    error::DecodeError,
21    protocol::commands::*,
22    types::Header,
23};
24
25mod shims;
26
27#[cfg_attr(test, derive(fake::Dummy))]
28#[derive(Debug, PartialEq, Eq, Clone)]
29pub enum ResponseCode {
30    Ok,
31    StreamDoesNotExist,
32    SubscriptionIdAlreadyExists,
33    SubscriptionIdDoesNotExist,
34    StreamAlreadyExists,
35    StreamNotAvailable,
36    SaslMechanismNotSupported,
37    AuthenticationFailure,
38    SaslError,
39    SaslChallange,
40    AuthenticationFailureLoopback,
41    VirtualHostAccessFailure,
42    UnknownFrame,
43    FrameTooLarge,
44    InternalError,
45    AccessRefused,
46    PrecoditionFailed,
47    PublisherDoesNotExist,
48    OffsetNotFound,
49}
50#[derive(Debug, PartialEq, Eq)]
51pub struct Response {
52    header: Header,
53    pub(crate) kind: ResponseKind,
54}
55
56#[derive(Debug, PartialEq, Eq)]
57pub enum ResponseKind {
58    Open(OpenResponse),
59    Close(CloseResponse),
60    PeerProperties(PeerPropertiesResponse),
61    SaslHandshake(SaslHandshakeResponse),
62    Generic(GenericResponse),
63    Tunes(TunesCommand),
64    Deliver(DeliverCommand),
65    Heartbeat(HeartbeatResponse),
66    Metadata(MetadataResponse),
67    MetadataUpdate(MetadataUpdateCommand),
68    PublishConfirm(PublishConfirm),
69    PublishError(PublishErrorResponse),
70    QueryOffset(QueryOffsetResponse),
71    QueryPublisherSequence(QueryPublisherResponse),
72    Credit(CreditResponse),
73    ExchangeCommandVersions(ExchangeCommandVersionsResponse),
74    SuperStreamPartitions(SuperStreamPartitionsResponse),
75    SuperStreamRoute(SuperStreamRouteResponse),
76    ConsumerUpdate(ConsumerUpdateCommand),
77    ConsumerUpdateRequest(ConsumerUpdateRequestCommand),
78}
79
80impl Response {
81    pub fn new(header: Header, kind: ResponseKind) -> Self {
82        Self { header, kind }
83    }
84
85    pub fn correlation_id(&self) -> Option<u32> {
86        match &self.kind {
87            ResponseKind::Open(open) => Some(open.correlation_id),
88            ResponseKind::Close(close) => Some(close.correlation_id),
89            ResponseKind::PeerProperties(peer_properties) => Some(peer_properties.correlation_id),
90            ResponseKind::SaslHandshake(handshake) => Some(handshake.correlation_id),
91            ResponseKind::Generic(generic) => Some(generic.correlation_id),
92            ResponseKind::Metadata(metadata) => Some(metadata.correlation_id),
93            ResponseKind::QueryOffset(query_offset) => Some(query_offset.correlation_id),
94            ResponseKind::QueryPublisherSequence(query_publisher) => {
95                Some(query_publisher.correlation_id)
96            }
97            ResponseKind::MetadataUpdate(_) => None,
98            ResponseKind::PublishConfirm(_) => None,
99            ResponseKind::PublishError(_) => None,
100            ResponseKind::Tunes(_) => None,
101            ResponseKind::Heartbeat(_) => None,
102            ResponseKind::Deliver(_) => None,
103            ResponseKind::Credit(_) => None,
104            ResponseKind::ExchangeCommandVersions(exchange_command_versions) => {
105                Some(exchange_command_versions.correlation_id)
106            }
107            ResponseKind::SuperStreamPartitions(super_stream_partitions_command) => {
108                Some(super_stream_partitions_command.correlation_id)
109            }
110            ResponseKind::SuperStreamRoute(super_stream_route_command) => {
111                Some(super_stream_route_command.correlation_id)
112            }
113            ResponseKind::ConsumerUpdate(consumer_update_command) => {
114                Some(consumer_update_command.correlation_id)
115            }
116            ResponseKind::ConsumerUpdateRequest(consumer_update_request_command) => {
117                Some(consumer_update_request_command.correlation_id)
118            }
119        }
120    }
121
122    pub fn get<T>(self) -> Option<T>
123    where
124        T: FromResponse,
125    {
126        T::from_response(self)
127    }
128
129    pub fn kind_ref(&self) -> &ResponseKind {
130        &self.kind
131    }
132    pub fn kind(self) -> ResponseKind {
133        self.kind
134    }
135}
136
137impl Decoder for Response {
138    fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
139        let (input, _) = read_u32(input)?;
140
141        let (input, header) = Header::decode(input)?;
142        let (input, kind) = match header.key() {
143            COMMAND_OPEN => {
144                OpenResponse::decode(input).map(|(i, kind)| (i, ResponseKind::Open(kind)))?
145            }
146
147            COMMAND_CLOSE => {
148                CloseResponse::decode(input).map(|(i, kind)| (i, ResponseKind::Close(kind)))?
149            }
150            COMMAND_PEER_PROPERTIES => PeerPropertiesResponse::decode(input)
151                .map(|(i, kind)| (i, ResponseKind::PeerProperties(kind)))?,
152            COMMAND_SASL_HANDSHAKE => SaslHandshakeResponse::decode(input)
153                .map(|(i, kind)| (i, ResponseKind::SaslHandshake(kind)))?,
154
155            COMMAND_DECLARE_PUBLISHER
156            | COMMAND_DELETE_PUBLISHER
157            | COMMAND_SASL_AUTHENTICATE
158            | COMMAND_SUBSCRIBE
159            | COMMAND_UNSUBSCRIBE
160            | COMMAND_CREATE_STREAM
161            | COMMAND_CREATE_SUPER_STREAM
162            | COMMAND_DELETE_SUPER_STREAM
163            | COMMAND_CONSUMER_UPDATE_REQUEST
164            | COMMAND_DELETE_STREAM => {
165                GenericResponse::decode(input).map(|(i, kind)| (i, ResponseKind::Generic(kind)))?
166            }
167            COMMAND_TUNE => {
168                TunesCommand::decode(input).map(|(i, kind)| (i, ResponseKind::Tunes(kind)))?
169            }
170            COMMAND_DELIVER => DeliverCommand::decode(input)
171                .map(|(remaining, kind)| (remaining, ResponseKind::Deliver(kind)))?,
172
173            COMMAND_HEARTBEAT => HeartbeatResponse::decode(input)
174                .map(|(remaining, kind)| (remaining, ResponseKind::Heartbeat(kind)))?,
175            COMMAND_METADATA => MetadataResponse::decode(input)
176                .map(|(remaining, kind)| (remaining, ResponseKind::Metadata(kind)))?,
177            COMMAND_METADATA_UPDATE => MetadataUpdateCommand::decode(input)
178                .map(|(remaining, kind)| (remaining, ResponseKind::MetadataUpdate(kind)))?,
179            COMMAND_PUBLISH_CONFIRM => PublishConfirm::decode(input)
180                .map(|(remaining, kind)| (remaining, ResponseKind::PublishConfirm(kind)))?,
181
182            COMMAND_PUBLISH_ERROR => PublishErrorResponse::decode(input)
183                .map(|(remaining, kind)| (remaining, ResponseKind::PublishError(kind)))?,
184
185            COMMAND_QUERY_OFFSET => QueryOffsetResponse::decode(input)
186                .map(|(remaining, kind)| (remaining, ResponseKind::QueryOffset(kind)))?,
187
188            COMMAND_CREDIT => CreditResponse::decode(input)
189                .map(|(remaining, kind)| (remaining, ResponseKind::Credit(kind)))?,
190
191            COMMAND_QUERY_PUBLISHER_SEQUENCE => QueryPublisherResponse::decode(input)
192                .map(|(remaining, kind)| (remaining, ResponseKind::QueryPublisherSequence(kind)))?,
193            COMMAND_EXCHANGE_COMMAND_VERSIONS => ExchangeCommandVersionsResponse::decode(input)
194                .map(|(remaining, kind)| {
195                    (remaining, ResponseKind::ExchangeCommandVersions(kind))
196                })?,
197            COMMAND_PARTITIONS => SuperStreamPartitionsResponse::decode(input)
198                .map(|(remaining, kind)| (remaining, ResponseKind::SuperStreamPartitions(kind)))?,
199            COMMAND_ROUTE => SuperStreamRouteResponse::decode(input)
200                .map(|(remaining, kind)| (remaining, ResponseKind::SuperStreamRoute(kind)))?,
201            COMMAND_CONSUMER_UPDATE => ConsumerUpdateCommand::decode(input)
202                .map(|(remaining, kind)| (remaining, ResponseKind::ConsumerUpdate(kind)))?,
203            n => return Err(DecodeError::UnsupportedResponseType(n)),
204        };
205        Ok((input, Response { header, kind }))
206    }
207}
208
209impl Decoder for ResponseCode {
210    fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
211        let (input, code) = read_u16(input)?;
212        Ok((input, code.try_into()?))
213    }
214}
215
216pub trait FromResponse
217where
218    Self: Sized,
219{
220    fn from_response(response: Response) -> Option<Self>;
221}
222
223#[cfg(test)]
224mod tests {
225
226    use byteorder::{BigEndian, WriteBytesExt};
227
228    use super::{Response, ResponseKind};
229    use crate::{
230        codec::{Decoder, Encoder},
231        commands::{
232            close::CloseResponse, consumer_update::ConsumerUpdateCommand,
233            consumer_update_request::ConsumerUpdateRequestCommand, deliver::DeliverCommand,
234            exchange_command_versions::ExchangeCommandVersionsResponse, generic::GenericResponse,
235            heart_beat::HeartbeatResponse, metadata::MetadataResponse,
236            metadata_update::MetadataUpdateCommand, open::OpenResponse,
237            peer_properties::PeerPropertiesResponse, publish_confirm::PublishConfirm,
238            publish_error::PublishErrorResponse, query_offset::QueryOffsetResponse,
239            query_publisher_sequence::QueryPublisherResponse,
240            sasl_handshake::SaslHandshakeResponse,
241            superstream_partitions::SuperStreamPartitionsResponse,
242            superstream_route::SuperStreamRouteResponse, tune::TunesCommand,
243        },
244        protocol::{
245            commands::{
246                COMMAND_CLOSE, COMMAND_CONSUMER_UPDATE, COMMAND_CONSUMER_UPDATE_REQUEST,
247                COMMAND_DELIVER, COMMAND_HEARTBEAT, COMMAND_METADATA, COMMAND_METADATA_UPDATE,
248                COMMAND_OPEN, COMMAND_PARTITIONS, COMMAND_PEER_PROPERTIES, COMMAND_PUBLISH_CONFIRM,
249                COMMAND_PUBLISH_ERROR, COMMAND_QUERY_OFFSET, COMMAND_QUERY_PUBLISHER_SEQUENCE,
250                COMMAND_ROUTE, COMMAND_SASL_AUTHENTICATE, COMMAND_SASL_HANDSHAKE, COMMAND_TUNE,
251            },
252            version::PROTOCOL_VERSION,
253        },
254        response::COMMAND_EXCHANGE_COMMAND_VERSIONS,
255        types::Header,
256        ResponseCode,
257    };
258
259    impl Encoder for ResponseKind {
260        fn encoded_size(&self) -> u32 {
261            match self {
262                ResponseKind::Open(open) => open.encoded_size(),
263                ResponseKind::Close(close) => close.encoded_size(),
264                ResponseKind::PeerProperties(peer_properties) => peer_properties.encoded_size(),
265                ResponseKind::SaslHandshake(handshake) => handshake.encoded_size(),
266                ResponseKind::Generic(generic) => generic.encoded_size(),
267                ResponseKind::Tunes(tune) => tune.encoded_size(),
268                ResponseKind::Heartbeat(heartbeat) => heartbeat.encoded_size(),
269                ResponseKind::Deliver(deliver) => deliver.encoded_size(),
270                ResponseKind::Metadata(metadata) => metadata.encoded_size(),
271                ResponseKind::MetadataUpdate(metadata) => metadata.encoded_size(),
272                ResponseKind::PublishConfirm(publish_confirm) => publish_confirm.encoded_size(),
273                ResponseKind::PublishError(publish_error) => publish_error.encoded_size(),
274                ResponseKind::QueryOffset(query_offset) => query_offset.encoded_size(),
275                ResponseKind::QueryPublisherSequence(query_publisher) => {
276                    query_publisher.encoded_size()
277                }
278                ResponseKind::Credit(credit) => credit.encoded_size(),
279                ResponseKind::ExchangeCommandVersions(exchange_command_versions) => {
280                    exchange_command_versions.encoded_size()
281                }
282                ResponseKind::SuperStreamPartitions(super_stream_response) => {
283                    super_stream_response.encoded_size()
284                }
285                ResponseKind::SuperStreamRoute(super_stream_response) => {
286                    super_stream_response.encoded_size()
287                }
288                ResponseKind::ConsumerUpdate(consumer_update_response) => {
289                    consumer_update_response.encoded_size()
290                }
291                ResponseKind::ConsumerUpdateRequest(consumer_update_request_response) => {
292                    consumer_update_request_response.encoded_size()
293                }
294            }
295        }
296
297        fn encode(
298            &self,
299            writer: &mut impl std::io::Write,
300        ) -> Result<(), crate::error::EncodeError> {
301            match self {
302                ResponseKind::Open(open) => open.encode(writer),
303                ResponseKind::Close(close) => close.encode(writer),
304                ResponseKind::PeerProperties(peer_properties) => peer_properties.encode(writer),
305                ResponseKind::SaslHandshake(handshake) => handshake.encode(writer),
306                ResponseKind::Generic(generic) => generic.encode(writer),
307                ResponseKind::Tunes(tune) => tune.encode(writer),
308                ResponseKind::Heartbeat(heartbeat) => heartbeat.encode(writer),
309                ResponseKind::Deliver(deliver) => deliver.encode(writer),
310                ResponseKind::Metadata(metadata) => metadata.encode(writer),
311                ResponseKind::MetadataUpdate(metadata) => metadata.encode(writer),
312                ResponseKind::PublishConfirm(publish_confirm) => publish_confirm.encode(writer),
313                ResponseKind::PublishError(publish_error) => publish_error.encode(writer),
314                ResponseKind::QueryOffset(query_offset) => query_offset.encode(writer),
315                ResponseKind::QueryPublisherSequence(query_publisher) => {
316                    query_publisher.encode(writer)
317                }
318                ResponseKind::Credit(credit) => credit.encode(writer),
319                ResponseKind::ExchangeCommandVersions(exchange_command_versions) => {
320                    exchange_command_versions.encode(writer)
321                }
322                ResponseKind::SuperStreamPartitions(super_stream_command_versions) => {
323                    super_stream_command_versions.encode(writer)
324                }
325                ResponseKind::SuperStreamRoute(super_stream_command_versions) => {
326                    super_stream_command_versions.encode(writer)
327                }
328                ResponseKind::ConsumerUpdate(consumer_update_command_version) => {
329                    consumer_update_command_version.encode(writer)
330                }
331                ResponseKind::ConsumerUpdateRequest(consumer_update_request_command_version) => {
332                    consumer_update_request_command_version.encode(writer)
333                }
334            }
335        }
336    }
337
338    impl Encoder for Response {
339        fn encoded_size(&self) -> u32 {
340            self.header.encoded_size() + 2 + self.kind.encoded_size()
341        }
342
343        fn encode(
344            &self,
345            writer: &mut impl std::io::Write,
346        ) -> Result<(), crate::error::EncodeError> {
347            writer.write_u32::<BigEndian>(self.encoded_size())?;
348            self.header.encode(writer)?;
349            self.kind.encode(writer)?;
350            Ok(())
351        }
352    }
353
354    macro_rules! response_test {
355        ($ty:ty, $variant:path, $cmd:expr) => {
356            use fake::{Fake, Faker};
357            let payload: $ty = Faker.fake();
358
359            let response = Response {
360                header: Header::new($cmd, PROTOCOL_VERSION),
361                kind: $variant(payload),
362            };
363
364            let mut buffer = vec![];
365
366            response.encode(&mut buffer).unwrap();
367
368            let (remaining, decoded) = Response::decode(&buffer).unwrap();
369
370            assert_eq!(response, decoded);
371
372            assert!(remaining.is_empty());
373        };
374    }
375
376    macro_rules! response_payload_test {
377        ($payload:expr, $variant:path, $cmd:expr) => {
378            let response = Response {
379                header: Header::new($cmd, PROTOCOL_VERSION),
380                kind: $variant($payload),
381            };
382
383            let mut buffer = vec![];
384
385            response.encode(&mut buffer).unwrap();
386
387            let (remaining, decoded) = Response::decode(&buffer).unwrap();
388
389            assert_eq!(response, decoded);
390
391            assert!(remaining.is_empty());
392        };
393    }
394
395    #[test]
396    fn open_response_ok_test() {
397        use {fake::Fake, fake::Faker};
398        let mut response: OpenResponse = Faker.fake();
399        response.code = ResponseCode::Ok;
400        response_payload_test!(response, ResponseKind::Open, COMMAND_OPEN);
401    }
402
403    #[test]
404    fn peer_properties_response_test() {
405        response_test!(
406            PeerPropertiesResponse,
407            ResponseKind::PeerProperties,
408            COMMAND_PEER_PROPERTIES
409        );
410    }
411
412    #[test]
413    fn sasl_handshake_response_test() {
414        response_test!(
415            SaslHandshakeResponse,
416            ResponseKind::SaslHandshake,
417            COMMAND_SASL_HANDSHAKE
418        );
419    }
420
421    #[test]
422    fn generic_response_test() {
423        response_test!(
424            GenericResponse,
425            ResponseKind::Generic,
426            COMMAND_SASL_AUTHENTICATE
427        );
428    }
429
430    #[test]
431    fn tune_response_test() {
432        response_test!(TunesCommand, ResponseKind::Tunes, COMMAND_TUNE);
433    }
434    #[test]
435    fn metadata_response_test() {
436        response_test!(MetadataResponse, ResponseKind::Metadata, COMMAND_METADATA);
437    }
438    #[test]
439    fn close_response_test() {
440        response_test!(CloseResponse, ResponseKind::Close, COMMAND_CLOSE);
441    }
442    #[test]
443    fn deliver_response_test() {
444        response_test!(DeliverCommand, ResponseKind::Deliver, COMMAND_DELIVER);
445    }
446    #[test]
447    fn metadata_update_response_test() {
448        response_test!(
449            MetadataUpdateCommand,
450            ResponseKind::MetadataUpdate,
451            COMMAND_METADATA_UPDATE
452        );
453    }
454    #[test]
455    fn publish_confirm_response_test() {
456        response_test!(
457            PublishConfirm,
458            ResponseKind::PublishConfirm,
459            COMMAND_PUBLISH_CONFIRM
460        );
461    }
462    #[test]
463    fn publish_error_response_test() {
464        response_test!(
465            PublishErrorResponse,
466            ResponseKind::PublishError,
467            COMMAND_PUBLISH_ERROR
468        );
469    }
470    #[test]
471    fn query_offset_response_test() {
472        response_test!(
473            QueryOffsetResponse,
474            ResponseKind::QueryOffset,
475            COMMAND_QUERY_OFFSET
476        );
477    }
478    #[test]
479    fn query_publisher_response_test() {
480        response_test!(
481            QueryPublisherResponse,
482            ResponseKind::QueryPublisherSequence,
483            COMMAND_QUERY_PUBLISHER_SEQUENCE
484        );
485    }
486    #[test]
487    fn heartbeat_response_test() {
488        response_test!(
489            HeartbeatResponse,
490            ResponseKind::Heartbeat,
491            COMMAND_HEARTBEAT
492        );
493    }
494
495    #[test]
496    fn exchange_command_versions_response_test() {
497        response_test!(
498            ExchangeCommandVersionsResponse,
499            ResponseKind::ExchangeCommandVersions,
500            COMMAND_EXCHANGE_COMMAND_VERSIONS
501        );
502    }
503
504    #[test]
505    fn super_stream_partitions_response_test() {
506        response_test!(
507            SuperStreamPartitionsResponse,
508            ResponseKind::SuperStreamPartitions,
509            COMMAND_PARTITIONS
510        );
511    }
512
513    #[test]
514    fn super_stream_route_response_test() {
515        response_test!(
516            SuperStreamRouteResponse,
517            ResponseKind::SuperStreamRoute,
518            COMMAND_ROUTE
519        );
520    }
521
522    #[test]
523    fn consumer_update_response_test() {
524        response_test!(
525            ConsumerUpdateCommand,
526            ResponseKind::ConsumerUpdate,
527            COMMAND_CONSUMER_UPDATE
528        );
529    }
530}