rabbitmq_stream_protocol/commands/
create_stream.rs1use std::collections::HashMap;
2use std::io::Write;
3
4use crate::{
5 codec::{Decoder, Encoder},
6 error::{DecodeError, EncodeError},
7 protocol::commands::COMMAND_CREATE_STREAM,
8};
9
10use super::Command;
11
12#[cfg_attr(test, derive(fake::Dummy))]
13#[derive(PartialEq, Eq, Debug)]
14pub struct CreateStreamCommand {
15 correlation_id: u32,
16 stream_name: String,
17 args: HashMap<String, String>,
18}
19
20impl CreateStreamCommand {
21 pub fn new(correlation_id: u32, stream_name: String, args: HashMap<String, String>) -> Self {
22 Self {
23 correlation_id,
24 stream_name,
25 args,
26 }
27 }
28}
29
30impl Encoder for CreateStreamCommand {
31 fn encoded_size(&self) -> u32 {
32 self.correlation_id.encoded_size()
33 + self.stream_name.as_str().encoded_size()
34 + self.args.encoded_size()
35 }
36
37 fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
38 self.correlation_id.encode(writer)?;
39 self.stream_name.as_str().encode(writer)?;
40 self.args.encode(writer)?;
41 Ok(())
42 }
43}
44
45impl Command for CreateStreamCommand {
46 fn key(&self) -> u16 {
47 COMMAND_CREATE_STREAM
48 }
49}
50
51impl Decoder for CreateStreamCommand {
52 fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
53 let (input, correlation_id) = u32::decode(input)?;
54 let (input, stream_name) = Option::decode(input)?;
55 let (input, args) = HashMap::decode(input)?;
56
57 Ok((
58 input,
59 CreateStreamCommand {
60 correlation_id,
61 stream_name: stream_name.unwrap(),
62 args,
63 },
64 ))
65 }
66}
67
68#[cfg(test)]
69mod tests {
70
71 use crate::commands::tests::command_encode_decode_test;
72
73 use super::CreateStreamCommand;
74
75 #[test]
76 fn create_stream_request_test() {
77 command_encode_decode_test::<CreateStreamCommand>();
78 }
79}