rabbitmq_stream_protocol/request/
mod.rs

1use std::io::Write;
2
3use crate::{
4    codec::{decoder::read_u32, Decoder, Encoder},
5    commands::{
6        close::CloseRequest, consumer_update_request::ConsumerUpdateRequestCommand,
7        create_stream::CreateStreamCommand, create_super_stream::CreateSuperStreamCommand,
8        credit::CreditCommand, declare_publisher::DeclarePublisherCommand, delete::Delete,
9        delete_publisher::DeletePublisherCommand, delete_super_stream::DeleteSuperStreamCommand,
10        exchange_command_versions::ExchangeCommandVersionsRequest, heart_beat::HeartBeatCommand,
11        metadata::MetadataCommand, open::OpenCommand, peer_properties::PeerPropertiesCommand,
12        publish::PublishCommand, query_offset::QueryOffsetRequest,
13        query_publisher_sequence::QueryPublisherRequest,
14        sasl_authenticate::SaslAuthenticateCommand, sasl_handshake::SaslHandshakeCommand,
15        store_offset::StoreOffset, subscribe::SubscribeCommand,
16        superstream_partitions::SuperStreamPartitionsRequest,
17        superstream_route::SuperStreamRouteRequest, tune::TunesCommand,
18        unsubscribe::UnSubscribeCommand,
19    },
20    error::{DecodeError, EncodeError},
21    protocol::commands::*,
22    types::Header,
23};
24
25use byteorder::{BigEndian, WriteBytesExt};
26
27mod shims;
28#[derive(Debug, PartialEq, Eq)]
29pub struct Request {
30    header: Header,
31    kind: RequestKind,
32}
33
34impl Request {
35    /// Get a reference to the request's kind.
36    pub fn kind(&self) -> &RequestKind {
37        &self.kind
38    }
39
40    /// Get a reference to the request's header.
41    pub fn header(&self) -> &Header {
42        &self.header
43    }
44}
45#[derive(Debug, PartialEq, Eq)]
46pub enum RequestKind {
47    PeerProperties(PeerPropertiesCommand),
48    SaslHandshake(SaslHandshakeCommand),
49    SaslAuthenticate(SaslAuthenticateCommand),
50    Tunes(TunesCommand),
51    Open(OpenCommand),
52    Close(CloseRequest),
53    Delete(Delete),
54    CreateStream(CreateStreamCommand),
55    Subscribe(SubscribeCommand),
56    Credit(CreditCommand),
57    Metadata(MetadataCommand),
58    DeclarePublisher(DeclarePublisherCommand),
59    DeletePublisher(DeletePublisherCommand),
60    Heartbeat(HeartBeatCommand),
61    Publish(PublishCommand),
62    QueryOffset(QueryOffsetRequest),
63    QueryPublisherSequence(QueryPublisherRequest),
64    StoreOffset(StoreOffset),
65    Unsubscribe(UnSubscribeCommand),
66    ExchangeCommandVersions(ExchangeCommandVersionsRequest),
67    CreateSuperStream(CreateSuperStreamCommand),
68    DeleteSuperStream(DeleteSuperStreamCommand),
69    SuperStreamPartitions(SuperStreamPartitionsRequest),
70    SuperStreamRoute(SuperStreamRouteRequest),
71    ConsumerUpdateRequest(ConsumerUpdateRequestCommand),
72}
73
74impl Encoder for RequestKind {
75    fn encoded_size(&self) -> u32 {
76        match self {
77            RequestKind::PeerProperties(peer) => peer.encoded_size(),
78            RequestKind::SaslHandshake(handshake) => handshake.encoded_size(),
79            RequestKind::SaslAuthenticate(authenticate) => authenticate.encoded_size(),
80            RequestKind::Tunes(tunes) => tunes.encoded_size(),
81            RequestKind::Open(open) => open.encoded_size(),
82            RequestKind::Delete(delete) => delete.encoded_size(),
83            RequestKind::CreateStream(create_stream) => create_stream.encoded_size(),
84            RequestKind::Subscribe(subscribe) => subscribe.encoded_size(),
85            RequestKind::Credit(credit) => credit.encoded_size(),
86            RequestKind::Metadata(metadata) => metadata.encoded_size(),
87            RequestKind::Close(close) => close.encoded_size(),
88            RequestKind::DeclarePublisher(declare_publisher) => declare_publisher.encoded_size(),
89            RequestKind::DeletePublisher(delete_publisher) => delete_publisher.encoded_size(),
90            RequestKind::Heartbeat(heartbeat) => heartbeat.encoded_size(),
91            RequestKind::Publish(publish) => publish.encoded_size(),
92            RequestKind::QueryOffset(query_offset) => query_offset.encoded_size(),
93            RequestKind::QueryPublisherSequence(query_publisher) => query_publisher.encoded_size(),
94            RequestKind::StoreOffset(store_offset) => store_offset.encoded_size(),
95            RequestKind::Unsubscribe(unsubscribe) => unsubscribe.encoded_size(),
96            RequestKind::ExchangeCommandVersions(exchange_command_versions) => {
97                exchange_command_versions.encoded_size()
98            }
99            RequestKind::CreateSuperStream(create_super_stream) => {
100                create_super_stream.encoded_size()
101            }
102            RequestKind::DeleteSuperStream(delete_super_stream) => {
103                delete_super_stream.encoded_size()
104            }
105            RequestKind::SuperStreamPartitions(super_stream_partitions) => {
106                super_stream_partitions.encoded_size()
107            }
108            RequestKind::SuperStreamRoute(super_stream_route) => super_stream_route.encoded_size(),
109            RequestKind::ConsumerUpdateRequest(consumer_update_request) => {
110                consumer_update_request.encoded_size()
111            }
112        }
113    }
114
115    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
116        match self {
117            RequestKind::PeerProperties(peer) => peer.encode(writer),
118            RequestKind::SaslHandshake(handshake) => handshake.encode(writer),
119            RequestKind::SaslAuthenticate(authenticate) => authenticate.encode(writer),
120            RequestKind::Tunes(tunes) => tunes.encode(writer),
121            RequestKind::Open(open) => open.encode(writer),
122            RequestKind::Delete(delete) => delete.encode(writer),
123            RequestKind::CreateStream(create_stream) => create_stream.encode(writer),
124            RequestKind::Subscribe(subscribe) => subscribe.encode(writer),
125            RequestKind::Credit(credit) => credit.encode(writer),
126            RequestKind::Metadata(metadata) => metadata.encode(writer),
127            RequestKind::Close(close) => close.encode(writer),
128            RequestKind::DeclarePublisher(declare_publisher) => declare_publisher.encode(writer),
129            RequestKind::DeletePublisher(delete_publisher) => delete_publisher.encode(writer),
130            RequestKind::Heartbeat(heartbeat) => heartbeat.encode(writer),
131            RequestKind::Publish(publish) => publish.encode(writer),
132            RequestKind::QueryOffset(query_offset) => query_offset.encode(writer),
133            RequestKind::QueryPublisherSequence(query_publisher) => query_publisher.encode(writer),
134            RequestKind::StoreOffset(store_offset) => store_offset.encode(writer),
135            RequestKind::Unsubscribe(unsubcribe) => unsubcribe.encode(writer),
136            RequestKind::ExchangeCommandVersions(exchange_command_versions) => {
137                exchange_command_versions.encode(writer)
138            }
139            RequestKind::CreateSuperStream(create_super_stream) => {
140                create_super_stream.encode(writer)
141            }
142            RequestKind::DeleteSuperStream(delete_super_stream) => {
143                delete_super_stream.encode(writer)
144            }
145            RequestKind::SuperStreamPartitions(super_stream_partition) => {
146                super_stream_partition.encode(writer)
147            }
148            RequestKind::SuperStreamRoute(super_stream_route) => super_stream_route.encode(writer),
149            RequestKind::ConsumerUpdateRequest(consumer_update_request) => {
150                consumer_update_request.encode(writer)
151            }
152        }
153    }
154}
155impl Encoder for Request {
156    fn encoded_size(&self) -> u32 {
157        self.header.encoded_size() + self.kind.encoded_size()
158    }
159
160    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
161        writer.write_u32::<BigEndian>(self.encoded_size())?;
162        self.header.encode(writer)?;
163        self.kind.encode(writer)?;
164        Ok(())
165    }
166}
167
168impl Decoder for Request {
169    fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
170        let (input, _) = read_u32(input)?;
171        let (input, header) = Header::decode(input)?;
172
173        let (input, cmd) = match header.key() {
174            COMMAND_OPEN => OpenCommand::decode(input).map(|(i, kind)| (i, kind.into()))?,
175            COMMAND_PEER_PROPERTIES => {
176                PeerPropertiesCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
177            }
178            COMMAND_SASL_HANDSHAKE => {
179                SaslHandshakeCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
180            }
181            COMMAND_SASL_AUTHENTICATE => {
182                SaslAuthenticateCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
183            }
184            COMMAND_TUNE => TunesCommand::decode(input).map(|(i, kind)| (i, kind.into()))?,
185            COMMAND_DELETE_STREAM => Delete::decode(input).map(|(i, kind)| (i, kind.into()))?,
186            COMMAND_CREATE_STREAM => {
187                CreateStreamCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
188            }
189            COMMAND_METADATA => MetadataCommand::decode(input).map(|(i, kind)| (i, kind.into()))?,
190            COMMAND_CLOSE => CloseRequest::decode(input).map(|(i, kind)| (i, kind.into()))?,
191            COMMAND_CREDIT => CreditCommand::decode(input).map(|(i, kind)| (i, kind.into()))?,
192            COMMAND_DECLARE_PUBLISHER => {
193                DeclarePublisherCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
194            }
195            COMMAND_DELETE_PUBLISHER => {
196                DeletePublisherCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
197            }
198
199            COMMAND_HEARTBEAT => {
200                HeartBeatCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
201            }
202            COMMAND_PUBLISH => PublishCommand::decode(input).map(|(i, kind)| (i, kind.into()))?,
203            COMMAND_QUERY_OFFSET => {
204                QueryOffsetRequest::decode(input).map(|(i, kind)| (i, kind.into()))?
205            }
206            COMMAND_QUERY_PUBLISHER_SEQUENCE => {
207                QueryPublisherRequest::decode(input).map(|(i, kind)| (i, kind.into()))?
208            }
209
210            COMMAND_STORE_OFFSET => StoreOffset::decode(input).map(|(i, kind)| (i, kind.into()))?,
211            COMMAND_SUBSCRIBE => {
212                SubscribeCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
213            }
214            COMMAND_UNSUBSCRIBE => {
215                UnSubscribeCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
216            }
217            COMMAND_EXCHANGE_COMMAND_VERSIONS => {
218                ExchangeCommandVersionsRequest::decode(input).map(|(i, kind)| (i, kind.into()))?
219            }
220            COMMAND_CREATE_SUPER_STREAM => {
221                CreateSuperStreamCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
222            }
223            COMMAND_DELETE_SUPER_STREAM => {
224                DeleteSuperStreamCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
225            }
226            COMMAND_PARTITIONS => {
227                SuperStreamPartitionsRequest::decode(input).map(|(i, kind)| (i, kind.into()))?
228            }
229            COMMAND_ROUTE => {
230                SuperStreamRouteRequest::decode(input).map(|(i, kind)| (i, kind.into()))?
231            }
232            COMMAND_CONSUMER_UPDATE_REQUEST => {
233                ConsumerUpdateRequestCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
234            }
235            n => return Err(DecodeError::UnsupportedResponseType(n)),
236        };
237        Ok((input, Request { header, kind: cmd }))
238    }
239}
240
241#[cfg(test)]
242mod tests {
243
244    use crate::{
245        codec::{Decoder, Encoder},
246        commands::{
247            close::CloseRequest, create_stream::CreateStreamCommand,
248            create_super_stream::CreateSuperStreamCommand, credit::CreditCommand,
249            declare_publisher::DeclarePublisherCommand, delete::Delete,
250            delete_publisher::DeletePublisherCommand,
251            delete_super_stream::DeleteSuperStreamCommand,
252            exchange_command_versions::ExchangeCommandVersionsRequest,
253            heart_beat::HeartBeatCommand, metadata::MetadataCommand, open::OpenCommand,
254            peer_properties::PeerPropertiesCommand, publish::PublishCommand,
255            query_offset::QueryOffsetRequest, query_publisher_sequence::QueryPublisherRequest,
256            sasl_authenticate::SaslAuthenticateCommand, sasl_handshake::SaslHandshakeCommand,
257            store_offset::StoreOffset, subscribe::SubscribeCommand,
258            superstream_partitions::SuperStreamPartitionsRequest,
259            superstream_route::SuperStreamRouteRequest, tune::TunesCommand,
260            unsubscribe::UnSubscribeCommand, Command,
261        },
262    };
263
264    use std::fmt::Debug;
265
266    use super::Request;
267    use fake::{Dummy, Fake, Faker};
268
269    #[test]
270    fn request_open_test() {
271        request_encode_decode_test::<OpenCommand>()
272    }
273
274    #[test]
275    fn request_peer_properties_test() {
276        request_encode_decode_test::<PeerPropertiesCommand>()
277    }
278
279    #[test]
280    fn request_create_stream_test() {
281        request_encode_decode_test::<CreateStreamCommand>()
282    }
283
284    #[test]
285    fn request_delete_stream_test() {
286        request_encode_decode_test::<Delete>()
287    }
288
289    #[test]
290    fn request_sasl_authenticate_test() {
291        request_encode_decode_test::<SaslAuthenticateCommand>()
292    }
293
294    #[test]
295    fn request_sasl_handshake_test() {
296        request_encode_decode_test::<SaslHandshakeCommand>()
297    }
298
299    #[test]
300    fn request_tune_test() {
301        request_encode_decode_test::<TunesCommand>()
302    }
303
304    #[test]
305    fn request_metadata_test() {
306        request_encode_decode_test::<MetadataCommand>()
307    }
308    #[test]
309    fn request_close_test() {
310        request_encode_decode_test::<CloseRequest>()
311    }
312
313    #[test]
314    fn request_credit_test() {
315        request_encode_decode_test::<CreditCommand>()
316    }
317
318    #[test]
319    fn request_declare_publisher_test() {
320        request_encode_decode_test::<DeclarePublisherCommand>()
321    }
322    #[test]
323    fn request_delete_publisher_test() {
324        request_encode_decode_test::<DeletePublisherCommand>()
325    }
326    #[test]
327    fn request_heartbeat_test() {
328        request_encode_decode_test::<HeartBeatCommand>()
329    }
330
331    #[test]
332    fn request_publish_test() {
333        request_encode_decode_test::<PublishCommand>()
334    }
335    #[test]
336    fn request_query_offset_test() {
337        request_encode_decode_test::<QueryOffsetRequest>()
338    }
339    #[test]
340    fn request_query_publisher_test() {
341        request_encode_decode_test::<QueryPublisherRequest>()
342    }
343
344    #[test]
345    fn request_store_offset_test() {
346        request_encode_decode_test::<StoreOffset>()
347    }
348    #[test]
349    fn request_subscribe_test() {
350        request_encode_decode_test::<SubscribeCommand>()
351    }
352    #[test]
353    fn request_unsubscribe_test() {
354        request_encode_decode_test::<UnSubscribeCommand>()
355    }
356    fn request_encode_decode_test<T>()
357    where
358        T: Dummy<Faker> + Encoder + Decoder + Debug + PartialEq + Command + Into<Request>,
359    {
360        let command: T = Faker.fake();
361
362        let request: Request = command.into();
363
364        let mut buffer = vec![];
365        let _ = request.encode(&mut buffer).unwrap();
366
367        let (remaining, decoded) = Request::decode(&buffer).unwrap();
368
369        assert_eq!(buffer[4..].len(), decoded.encoded_size() as usize);
370
371        assert_eq!(request, decoded);
372
373        assert!(remaining.is_empty());
374    }
375
376    #[test]
377    fn request_exchange_command_versions_test() {
378        request_encode_decode_test::<ExchangeCommandVersionsRequest>()
379    }
380
381    #[test]
382    fn request_create_super_stream_test() {
383        request_encode_decode_test::<CreateSuperStreamCommand>()
384    }
385
386    #[test]
387    fn request_delete_super_stream_test() {
388        request_encode_decode_test::<DeleteSuperStreamCommand>()
389    }
390
391    #[test]
392    fn request_partitions_command() {
393        request_encode_decode_test::<SuperStreamPartitionsRequest>()
394    }
395
396    #[test]
397    fn request_route_command() {
398        request_encode_decode_test::<SuperStreamRouteRequest>()
399    }
400}