rabbitmq_stream_protocol/commands/
superstream_partitions.rs

1use std::io::Write;
2
3use super::Command;
4use crate::{
5    codec::{Decoder, Encoder},
6    error::{DecodeError, EncodeError},
7    protocol::commands::COMMAND_PARTITIONS,
8    FromResponse, ResponseCode,
9};
10
11#[cfg_attr(test, derive(fake::Dummy))]
12#[derive(PartialEq, Eq, Debug)]
13pub struct SuperStreamPartitionsRequest {
14    correlation_id: u32,
15    super_stream: String,
16}
17
18impl SuperStreamPartitionsRequest {
19    pub fn new(correlation_id: u32, super_stream: String) -> Self {
20        Self {
21            correlation_id,
22            super_stream,
23        }
24    }
25}
26
27impl Encoder for SuperStreamPartitionsRequest {
28    fn encoded_size(&self) -> u32 {
29        self.correlation_id.encoded_size() + self.super_stream.as_str().encoded_size()
30    }
31
32    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
33        self.correlation_id.encode(writer)?;
34        self.super_stream.as_str().encode(writer)?;
35        Ok(())
36    }
37}
38
39impl Decoder for SuperStreamPartitionsRequest {
40    fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
41        let (input, correlation_id) = u32::decode(input)?;
42        let (input, super_stream) = Option::decode(input)?;
43
44        Ok((
45            input,
46            SuperStreamPartitionsRequest {
47                correlation_id,
48                super_stream: super_stream.unwrap(),
49            },
50        ))
51    }
52}
53
54impl Command for SuperStreamPartitionsRequest {
55    fn key(&self) -> u16 {
56        COMMAND_PARTITIONS
57    }
58}
59
60#[cfg_attr(test, derive(fake::Dummy))]
61#[derive(PartialEq, Eq, Debug)]
62pub struct SuperStreamPartitionsResponse {
63    pub(crate) correlation_id: u32,
64    response_code: ResponseCode,
65    pub streams: Vec<String>,
66}
67
68impl SuperStreamPartitionsResponse {
69    pub fn new(correlation_id: u32, streams: Vec<String>, response_code: ResponseCode) -> Self {
70        Self {
71            correlation_id,
72            response_code,
73            streams,
74        }
75    }
76    pub fn is_ok(&self) -> bool {
77        self.response_code == ResponseCode::Ok
78    }
79}
80
81impl Encoder for SuperStreamPartitionsResponse {
82    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
83        self.correlation_id.encode(writer)?;
84        self.response_code.encode(writer)?;
85        self.streams.encode(writer)?;
86        Ok(())
87    }
88
89    fn encoded_size(&self) -> u32 {
90        self.correlation_id.encoded_size()
91            + self.response_code.encoded_size()
92            + self.streams.encoded_size()
93    }
94}
95
96impl Decoder for SuperStreamPartitionsResponse {
97    fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
98        let (input, correlation_id) = u32::decode(input)?;
99        let (input, response_code) = ResponseCode::decode(input)?;
100        let (input, streams) = <Vec<String>>::decode(input)?;
101
102        Ok((
103            input,
104            SuperStreamPartitionsResponse {
105                correlation_id,
106                response_code,
107                streams,
108            },
109        ))
110    }
111}
112
113impl FromResponse for SuperStreamPartitionsResponse {
114    fn from_response(response: crate::Response) -> Option<Self> {
115        match response.kind {
116            crate::ResponseKind::SuperStreamPartitions(partitions_response) => {
117                Some(partitions_response)
118            }
119            _ => None,
120        }
121    }
122}
123
124#[cfg(test)]
125mod tests {
126
127    use crate::commands::tests::command_encode_decode_test;
128
129    use super::SuperStreamPartitionsRequest;
130    use super::SuperStreamPartitionsResponse;
131
132    #[test]
133    fn super_stream_partition_request_test() {
134        command_encode_decode_test::<SuperStreamPartitionsRequest>();
135    }
136
137    #[test]
138    fn super_stream_partition_response_test() {
139        command_encode_decode_test::<SuperStreamPartitionsResponse>();
140    }
141}