rabbitmq_stream_protocol/commands/
create_super_stream.rs

1use std::collections::HashMap;
2use std::io::Write;
3
4use crate::{
5    codec::{Decoder, Encoder},
6    error::{DecodeError, EncodeError},
7    protocol::commands::COMMAND_CREATE_SUPER_STREAM,
8};
9
10use super::Command;
11
12#[cfg_attr(test, derive(fake::Dummy))]
13#[derive(PartialEq, Eq, Debug)]
14pub struct CreateSuperStreamCommand {
15    correlation_id: u32,
16    super_stream_name: String,
17    partitions: Vec<String>,
18    binding_keys: Vec<String>,
19    args: HashMap<String, String>,
20}
21
22impl CreateSuperStreamCommand {
23    pub fn new(
24        correlation_id: u32,
25        super_stream_name: String,
26        partitions: Vec<String>,
27        binding_keys: Vec<String>,
28        args: HashMap<String, String>,
29    ) -> Self {
30        Self {
31            correlation_id,
32            super_stream_name,
33            partitions,
34            binding_keys,
35            args,
36        }
37    }
38}
39
40impl Encoder for CreateSuperStreamCommand {
41    fn encoded_size(&self) -> u32 {
42        self.correlation_id.encoded_size()
43            + self.super_stream_name.as_str().encoded_size()
44            + self.partitions.encoded_size()
45            + self.binding_keys.encoded_size()
46            + self.args.encoded_size()
47    }
48
49    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
50        self.correlation_id.encode(writer)?;
51        self.super_stream_name.as_str().encode(writer)?;
52        self.partitions.encode(writer)?;
53        self.binding_keys.encode(writer)?;
54        self.args.encode(writer)?;
55        Ok(())
56    }
57}
58
59impl Decoder for CreateSuperStreamCommand {
60    fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
61        let (input, correlation_id) = u32::decode(input)?;
62        let (input, super_stream_name) = Option::decode(input)?;
63        let (input, partitions) = <Vec<String>>::decode(input)?;
64        let (input, binding_keys) = <Vec<String>>::decode(input)?;
65        let (input, args) = HashMap::decode(input)?;
66
67        Ok((
68            input,
69            CreateSuperStreamCommand {
70                correlation_id,
71                super_stream_name: super_stream_name.unwrap(),
72                partitions,
73                binding_keys,
74                args,
75            },
76        ))
77    }
78}
79
80impl Command for CreateSuperStreamCommand {
81    fn key(&self) -> u16 {
82        COMMAND_CREATE_SUPER_STREAM
83    }
84}
85
86#[cfg(test)]
87mod tests {
88
89    use crate::commands::tests::command_encode_decode_test;
90
91    use super::CreateSuperStreamCommand;
92
93    #[test]
94    fn create_super_stream_request_test() {
95        command_encode_decode_test::<CreateSuperStreamCommand>();
96    }
97}