rabbitmq_stream_protocol/commands/
create_stream.rs

1use std::collections::HashMap;
2use std::io::Write;
3
4use crate::{
5    codec::{Decoder, Encoder},
6    error::{DecodeError, EncodeError},
7    protocol::commands::COMMAND_CREATE_STREAM,
8};
9
10use super::Command;
11
12#[cfg_attr(test, derive(fake::Dummy))]
13#[derive(PartialEq, Eq, Debug)]
14pub struct CreateStreamCommand {
15    correlation_id: u32,
16    stream_name: String,
17    args: HashMap<String, String>,
18}
19
20impl CreateStreamCommand {
21    pub fn new(correlation_id: u32, stream_name: String, args: HashMap<String, String>) -> Self {
22        Self {
23            correlation_id,
24            stream_name,
25            args,
26        }
27    }
28}
29
30impl Encoder for CreateStreamCommand {
31    fn encoded_size(&self) -> u32 {
32        self.correlation_id.encoded_size()
33            + self.stream_name.as_str().encoded_size()
34            + self.args.encoded_size()
35    }
36
37    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
38        self.correlation_id.encode(writer)?;
39        self.stream_name.as_str().encode(writer)?;
40        self.args.encode(writer)?;
41        Ok(())
42    }
43}
44
45impl Command for CreateStreamCommand {
46    fn key(&self) -> u16 {
47        COMMAND_CREATE_STREAM
48    }
49}
50
51impl Decoder for CreateStreamCommand {
52    fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
53        let (input, correlation_id) = u32::decode(input)?;
54        let (input, stream_name) = Option::decode(input)?;
55        let (input, args) = HashMap::decode(input)?;
56
57        Ok((
58            input,
59            CreateStreamCommand {
60                correlation_id,
61                stream_name: stream_name.unwrap(),
62                args,
63            },
64        ))
65    }
66}
67
68#[cfg(test)]
69mod tests {
70
71    use crate::commands::tests::command_encode_decode_test;
72
73    use super::CreateStreamCommand;
74
75    #[test]
76    fn create_stream_request_test() {
77        command_encode_decode_test::<CreateStreamCommand>();
78    }
79}