rabbitmq_stream_protocol/commands/
create_super_stream.rs1use std::collections::HashMap;
2use std::io::Write;
3
4use crate::{
5 codec::{Decoder, Encoder},
6 error::{DecodeError, EncodeError},
7 protocol::commands::COMMAND_CREATE_SUPER_STREAM,
8};
9
10use super::Command;
11
12#[cfg_attr(test, derive(fake::Dummy))]
13#[derive(PartialEq, Eq, Debug)]
14pub struct CreateSuperStreamCommand {
15 correlation_id: u32,
16 super_stream_name: String,
17 partitions: Vec<String>,
18 binding_keys: Vec<String>,
19 args: HashMap<String, String>,
20}
21
22impl CreateSuperStreamCommand {
23 pub fn new(
24 correlation_id: u32,
25 super_stream_name: String,
26 partitions: Vec<String>,
27 binding_keys: Vec<String>,
28 args: HashMap<String, String>,
29 ) -> Self {
30 Self {
31 correlation_id,
32 super_stream_name,
33 partitions,
34 binding_keys,
35 args,
36 }
37 }
38}
39
40impl Encoder for CreateSuperStreamCommand {
41 fn encoded_size(&self) -> u32 {
42 self.correlation_id.encoded_size()
43 + self.super_stream_name.as_str().encoded_size()
44 + self.partitions.encoded_size()
45 + self.binding_keys.encoded_size()
46 + self.args.encoded_size()
47 }
48
49 fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
50 self.correlation_id.encode(writer)?;
51 self.super_stream_name.as_str().encode(writer)?;
52 self.partitions.encode(writer)?;
53 self.binding_keys.encode(writer)?;
54 self.args.encode(writer)?;
55 Ok(())
56 }
57}
58
59impl Decoder for CreateSuperStreamCommand {
60 fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
61 let (input, correlation_id) = u32::decode(input)?;
62 let (input, super_stream_name) = Option::decode(input)?;
63 let (input, partitions) = <Vec<String>>::decode(input)?;
64 let (input, binding_keys) = <Vec<String>>::decode(input)?;
65 let (input, args) = HashMap::decode(input)?;
66
67 Ok((
68 input,
69 CreateSuperStreamCommand {
70 correlation_id,
71 super_stream_name: super_stream_name.unwrap(),
72 partitions,
73 binding_keys,
74 args,
75 },
76 ))
77 }
78}
79
80impl Command for CreateSuperStreamCommand {
81 fn key(&self) -> u16 {
82 COMMAND_CREATE_SUPER_STREAM
83 }
84}
85
86#[cfg(test)]
87mod tests {
88
89 use crate::commands::tests::command_encode_decode_test;
90
91 use super::CreateSuperStreamCommand;
92
93 #[test]
94 fn create_super_stream_request_test() {
95 command_encode_decode_test::<CreateSuperStreamCommand>();
96 }
97}