rabbitmq_stream_protocol/request/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
use std::io::Write;

use crate::{
    codec::{decoder::read_u32, Decoder, Encoder},
    commands::{
        close::CloseRequest, consumer_update_request::ConsumerUpdateRequestCommand,
        create_stream::CreateStreamCommand, create_super_stream::CreateSuperStreamCommand,
        credit::CreditCommand, declare_publisher::DeclarePublisherCommand, delete::Delete,
        delete_publisher::DeletePublisherCommand, delete_super_stream::DeleteSuperStreamCommand,
        exchange_command_versions::ExchangeCommandVersionsRequest, heart_beat::HeartBeatCommand,
        metadata::MetadataCommand, open::OpenCommand, peer_properties::PeerPropertiesCommand,
        publish::PublishCommand, query_offset::QueryOffsetRequest,
        query_publisher_sequence::QueryPublisherRequest,
        sasl_authenticate::SaslAuthenticateCommand, sasl_handshake::SaslHandshakeCommand,
        store_offset::StoreOffset, subscribe::SubscribeCommand,
        superstream_partitions::SuperStreamPartitionsRequest,
        superstream_route::SuperStreamRouteRequest, tune::TunesCommand,
        unsubscribe::UnSubscribeCommand,
    },
    error::{DecodeError, EncodeError},
    protocol::commands::*,
    types::Header,
};

use byteorder::{BigEndian, WriteBytesExt};

mod shims;
#[derive(Debug, PartialEq, Eq)]
pub struct Request {
    header: Header,
    kind: RequestKind,
}

impl Request {
    /// Get a reference to the request's kind.
    pub fn kind(&self) -> &RequestKind {
        &self.kind
    }

    /// Get a reference to the request's header.
    pub fn header(&self) -> &Header {
        &self.header
    }
}
#[derive(Debug, PartialEq, Eq)]
pub enum RequestKind {
    PeerProperties(PeerPropertiesCommand),
    SaslHandshake(SaslHandshakeCommand),
    SaslAuthenticate(SaslAuthenticateCommand),
    Tunes(TunesCommand),
    Open(OpenCommand),
    Close(CloseRequest),
    Delete(Delete),
    CreateStream(CreateStreamCommand),
    Subscribe(SubscribeCommand),
    Credit(CreditCommand),
    Metadata(MetadataCommand),
    DeclarePublisher(DeclarePublisherCommand),
    DeletePublisher(DeletePublisherCommand),
    Heartbeat(HeartBeatCommand),
    Publish(PublishCommand),
    QueryOffset(QueryOffsetRequest),
    QueryPublisherSequence(QueryPublisherRequest),
    StoreOffset(StoreOffset),
    Unsubscribe(UnSubscribeCommand),
    ExchangeCommandVersions(ExchangeCommandVersionsRequest),
    CreateSuperStream(CreateSuperStreamCommand),
    DeleteSuperStream(DeleteSuperStreamCommand),
    SuperStreamPartitions(SuperStreamPartitionsRequest),
    SuperStreamRoute(SuperStreamRouteRequest),
    ConsumerUpdateRequest(ConsumerUpdateRequestCommand),
}

impl Encoder for RequestKind {
    fn encoded_size(&self) -> u32 {
        match self {
            RequestKind::PeerProperties(peer) => peer.encoded_size(),
            RequestKind::SaslHandshake(handshake) => handshake.encoded_size(),
            RequestKind::SaslAuthenticate(authenticate) => authenticate.encoded_size(),
            RequestKind::Tunes(tunes) => tunes.encoded_size(),
            RequestKind::Open(open) => open.encoded_size(),
            RequestKind::Delete(delete) => delete.encoded_size(),
            RequestKind::CreateStream(create_stream) => create_stream.encoded_size(),
            RequestKind::Subscribe(subscribe) => subscribe.encoded_size(),
            RequestKind::Credit(credit) => credit.encoded_size(),
            RequestKind::Metadata(metadata) => metadata.encoded_size(),
            RequestKind::Close(close) => close.encoded_size(),
            RequestKind::DeclarePublisher(declare_publisher) => declare_publisher.encoded_size(),
            RequestKind::DeletePublisher(delete_publisher) => delete_publisher.encoded_size(),
            RequestKind::Heartbeat(heartbeat) => heartbeat.encoded_size(),
            RequestKind::Publish(publish) => publish.encoded_size(),
            RequestKind::QueryOffset(query_offset) => query_offset.encoded_size(),
            RequestKind::QueryPublisherSequence(query_publisher) => query_publisher.encoded_size(),
            RequestKind::StoreOffset(store_offset) => store_offset.encoded_size(),
            RequestKind::Unsubscribe(unsubscribe) => unsubscribe.encoded_size(),
            RequestKind::ExchangeCommandVersions(exchange_command_versions) => {
                exchange_command_versions.encoded_size()
            }
            RequestKind::CreateSuperStream(create_super_stream) => {
                create_super_stream.encoded_size()
            }
            RequestKind::DeleteSuperStream(delete_super_stream) => {
                delete_super_stream.encoded_size()
            }
            RequestKind::SuperStreamPartitions(super_stream_partitions) => {
                super_stream_partitions.encoded_size()
            }
            RequestKind::SuperStreamRoute(super_stream_route) => super_stream_route.encoded_size(),
            RequestKind::ConsumerUpdateRequest(consumer_update_request) => {
                consumer_update_request.encoded_size()
            }
        }
    }

    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
        match self {
            RequestKind::PeerProperties(peer) => peer.encode(writer),
            RequestKind::SaslHandshake(handshake) => handshake.encode(writer),
            RequestKind::SaslAuthenticate(authenticate) => authenticate.encode(writer),
            RequestKind::Tunes(tunes) => tunes.encode(writer),
            RequestKind::Open(open) => open.encode(writer),
            RequestKind::Delete(delete) => delete.encode(writer),
            RequestKind::CreateStream(create_stream) => create_stream.encode(writer),
            RequestKind::Subscribe(subscribe) => subscribe.encode(writer),
            RequestKind::Credit(credit) => credit.encode(writer),
            RequestKind::Metadata(metadata) => metadata.encode(writer),
            RequestKind::Close(close) => close.encode(writer),
            RequestKind::DeclarePublisher(declare_publisher) => declare_publisher.encode(writer),
            RequestKind::DeletePublisher(delete_publisher) => delete_publisher.encode(writer),
            RequestKind::Heartbeat(heartbeat) => heartbeat.encode(writer),
            RequestKind::Publish(publish) => publish.encode(writer),
            RequestKind::QueryOffset(query_offset) => query_offset.encode(writer),
            RequestKind::QueryPublisherSequence(query_publisher) => query_publisher.encode(writer),
            RequestKind::StoreOffset(store_offset) => store_offset.encode(writer),
            RequestKind::Unsubscribe(unsubcribe) => unsubcribe.encode(writer),
            RequestKind::ExchangeCommandVersions(exchange_command_versions) => {
                exchange_command_versions.encode(writer)
            }
            RequestKind::CreateSuperStream(create_super_stream) => {
                create_super_stream.encode(writer)
            }
            RequestKind::DeleteSuperStream(delete_super_stream) => {
                delete_super_stream.encode(writer)
            }
            RequestKind::SuperStreamPartitions(super_stream_partition) => {
                super_stream_partition.encode(writer)
            }
            RequestKind::SuperStreamRoute(super_stream_route) => super_stream_route.encode(writer),
            RequestKind::ConsumerUpdateRequest(consumer_update_request) => {
                consumer_update_request.encode(writer)
            }
        }
    }
}
impl Encoder for Request {
    fn encoded_size(&self) -> u32 {
        self.header.encoded_size() + self.kind.encoded_size()
    }

    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
        writer.write_u32::<BigEndian>(self.encoded_size())?;
        self.header.encode(writer)?;
        self.kind.encode(writer)?;
        Ok(())
    }
}

impl Decoder for Request {
    fn decode(input: &[u8]) -> Result<(&[u8], Self), crate::error::DecodeError> {
        let (input, _) = read_u32(input)?;
        let (input, header) = Header::decode(input)?;

        let (input, cmd) = match header.key() {
            COMMAND_OPEN => OpenCommand::decode(input).map(|(i, kind)| (i, kind.into()))?,
            COMMAND_PEER_PROPERTIES => {
                PeerPropertiesCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
            }
            COMMAND_SASL_HANDSHAKE => {
                SaslHandshakeCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
            }
            COMMAND_SASL_AUTHENTICATE => {
                SaslAuthenticateCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
            }
            COMMAND_TUNE => TunesCommand::decode(input).map(|(i, kind)| (i, kind.into()))?,
            COMMAND_DELETE_STREAM => Delete::decode(input).map(|(i, kind)| (i, kind.into()))?,
            COMMAND_CREATE_STREAM => {
                CreateStreamCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
            }
            COMMAND_METADATA => MetadataCommand::decode(input).map(|(i, kind)| (i, kind.into()))?,
            COMMAND_CLOSE => CloseRequest::decode(input).map(|(i, kind)| (i, kind.into()))?,
            COMMAND_CREDIT => CreditCommand::decode(input).map(|(i, kind)| (i, kind.into()))?,
            COMMAND_DECLARE_PUBLISHER => {
                DeclarePublisherCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
            }
            COMMAND_DELETE_PUBLISHER => {
                DeletePublisherCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
            }

            COMMAND_HEARTBEAT => {
                HeartBeatCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
            }
            COMMAND_PUBLISH => PublishCommand::decode(input).map(|(i, kind)| (i, kind.into()))?,
            COMMAND_QUERY_OFFSET => {
                QueryOffsetRequest::decode(input).map(|(i, kind)| (i, kind.into()))?
            }
            COMMAND_QUERY_PUBLISHER_SEQUENCE => {
                QueryPublisherRequest::decode(input).map(|(i, kind)| (i, kind.into()))?
            }

            COMMAND_STORE_OFFSET => StoreOffset::decode(input).map(|(i, kind)| (i, kind.into()))?,
            COMMAND_SUBSCRIBE => {
                SubscribeCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
            }
            COMMAND_UNSUBSCRIBE => {
                UnSubscribeCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
            }
            COMMAND_EXCHANGE_COMMAND_VERSIONS => {
                ExchangeCommandVersionsRequest::decode(input).map(|(i, kind)| (i, kind.into()))?
            }
            COMMAND_CREATE_SUPER_STREAM => {
                CreateSuperStreamCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
            }
            COMMAND_DELETE_SUPER_STREAM => {
                DeleteSuperStreamCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
            }
            COMMAND_PARTITIONS => {
                SuperStreamPartitionsRequest::decode(input).map(|(i, kind)| (i, kind.into()))?
            }
            COMMAND_ROUTE => {
                SuperStreamRouteRequest::decode(input).map(|(i, kind)| (i, kind.into()))?
            }
            COMMAND_CONSUMER_UPDATE_REQUEST => {
                ConsumerUpdateRequestCommand::decode(input).map(|(i, kind)| (i, kind.into()))?
            }
            n => return Err(DecodeError::UnsupportedResponseType(n)),
        };
        Ok((input, Request { header, kind: cmd }))
    }
}

#[cfg(test)]
mod tests {

    use crate::{
        codec::{Decoder, Encoder},
        commands::{
            close::CloseRequest, consumer_update_request::ConsumerUpdateRequestCommand,
            create_stream::CreateStreamCommand, create_super_stream::CreateSuperStreamCommand,
            credit::CreditCommand, declare_publisher::DeclarePublisherCommand, delete::Delete,
            delete_publisher::DeletePublisherCommand,
            delete_super_stream::DeleteSuperStreamCommand,
            exchange_command_versions::ExchangeCommandVersionsRequest,
            heart_beat::HeartBeatCommand, metadata::MetadataCommand, open::OpenCommand,
            peer_properties::PeerPropertiesCommand, publish::PublishCommand,
            query_offset::QueryOffsetRequest, query_publisher_sequence::QueryPublisherRequest,
            sasl_authenticate::SaslAuthenticateCommand, sasl_handshake::SaslHandshakeCommand,
            store_offset::StoreOffset, subscribe::SubscribeCommand,
            superstream_partitions::SuperStreamPartitionsRequest,
            superstream_route::SuperStreamRouteRequest, tune::TunesCommand,
            unsubscribe::UnSubscribeCommand, Command,
        },
    };

    use std::fmt::Debug;

    use super::Request;
    use fake::{Dummy, Fake, Faker};

    #[test]
    fn request_open_test() {
        request_encode_decode_test::<OpenCommand>()
    }

    #[test]
    fn request_peer_properties_test() {
        request_encode_decode_test::<PeerPropertiesCommand>()
    }

    #[test]
    fn request_create_stream_test() {
        request_encode_decode_test::<CreateStreamCommand>()
    }

    #[test]
    fn request_delete_stream_test() {
        request_encode_decode_test::<Delete>()
    }

    #[test]
    fn request_sasl_authenticate_test() {
        request_encode_decode_test::<SaslAuthenticateCommand>()
    }

    #[test]
    fn request_sasl_handshake_test() {
        request_encode_decode_test::<SaslHandshakeCommand>()
    }

    #[test]
    fn request_tune_test() {
        request_encode_decode_test::<TunesCommand>()
    }

    #[test]
    fn request_metadata_test() {
        request_encode_decode_test::<MetadataCommand>()
    }
    #[test]
    fn request_close_test() {
        request_encode_decode_test::<CloseRequest>()
    }

    #[test]
    fn request_credit_test() {
        request_encode_decode_test::<CreditCommand>()
    }

    #[test]
    fn request_declare_publisher_test() {
        request_encode_decode_test::<DeclarePublisherCommand>()
    }
    #[test]
    fn request_delete_publisher_test() {
        request_encode_decode_test::<DeletePublisherCommand>()
    }
    #[test]
    fn request_heartbeat_test() {
        request_encode_decode_test::<HeartBeatCommand>()
    }

    #[test]
    fn request_publish_test() {
        request_encode_decode_test::<PublishCommand>()
    }
    #[test]
    fn request_query_offset_test() {
        request_encode_decode_test::<QueryOffsetRequest>()
    }
    #[test]
    fn request_query_publisher_test() {
        request_encode_decode_test::<QueryPublisherRequest>()
    }

    #[test]
    fn request_store_offset_test() {
        request_encode_decode_test::<StoreOffset>()
    }
    #[test]
    fn request_subscribe_test() {
        request_encode_decode_test::<SubscribeCommand>()
    }
    #[test]
    fn request_unsubscribe_test() {
        request_encode_decode_test::<UnSubscribeCommand>()
    }
    fn request_encode_decode_test<T>()
    where
        T: Dummy<Faker> + Encoder + Decoder + Debug + PartialEq + Command + Into<Request>,
    {
        let command: T = Faker.fake();

        let request: Request = command.into();

        let mut buffer = vec![];
        let _ = request.encode(&mut buffer).unwrap();

        let (remaining, decoded) = Request::decode(&buffer).unwrap();

        assert_eq!(buffer[4..].len(), decoded.encoded_size() as usize);

        assert_eq!(request, decoded);

        assert!(remaining.is_empty());
    }

    #[test]
    fn request_exchange_command_versions_test() {
        request_encode_decode_test::<ExchangeCommandVersionsRequest>()
    }

    #[test]
    fn request_create_super_stream_test() {
        request_encode_decode_test::<CreateSuperStreamCommand>()
    }

    #[test]
    fn request_delete_super_stream_test() {
        request_encode_decode_test::<DeleteSuperStreamCommand>()
    }

    #[test]
    fn request_partitions_command() {
        request_encode_decode_test::<SuperStreamPartitionsRequest>()
    }

    #[test]
    fn request_route_command() {
        request_encode_decode_test::<SuperStreamRouteRequest>()
    }
}