rabbitmq_stream_protocol/commands/
superstream_partitions.rs

1use std::io::Write;
2
3#[cfg(test)]
4use fake::Fake;
5
6use super::Command;
7use crate::{
8    codec::{Decoder, Encoder},
9    error::{DecodeError, EncodeError},
10    protocol::commands::COMMAND_PARTITIONS,
11    FromResponse, ResponseCode,
12};
13
14#[cfg_attr(test, derive(fake::Dummy))]
15#[derive(PartialEq, Eq, Debug)]
16pub struct SuperStreamPartitionsRequest {
17    correlation_id: u32,
18    super_stream: String,
19}
20
21impl SuperStreamPartitionsRequest {
22    pub fn new(correlation_id: u32, super_stream: String) -> Self {
23        Self {
24            correlation_id,
25            super_stream,
26        }
27    }
28}
29
30impl Encoder for SuperStreamPartitionsRequest {
31    fn encoded_size(&self) -> u32 {
32        self.correlation_id.encoded_size() + self.super_stream.as_str().encoded_size()
33    }
34
35    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
36        self.correlation_id.encode(writer)?;
37        self.super_stream.as_str().encode(writer)?;
38        Ok(())
39    }
40}
41
42impl Decoder for SuperStreamPartitionsRequest {
43    fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
44        let (input, correlation_id) = u32::decode(input)?;
45        let (input, super_stream) = Option::decode(input)?;
46
47        Ok((
48            input,
49            SuperStreamPartitionsRequest {
50                correlation_id,
51                super_stream: super_stream.unwrap(),
52            },
53        ))
54    }
55}
56
57impl Command for SuperStreamPartitionsRequest {
58    fn key(&self) -> u16 {
59        COMMAND_PARTITIONS
60    }
61}
62
63#[cfg_attr(test, derive(fake::Dummy))]
64#[derive(PartialEq, Eq, Debug)]
65pub struct SuperStreamPartitionsResponse {
66    pub(crate) correlation_id: u32,
67    response_code: ResponseCode,
68    pub streams: Vec<String>,
69}
70
71impl SuperStreamPartitionsResponse {
72    pub fn new(correlation_id: u32, streams: Vec<String>, response_code: ResponseCode) -> Self {
73        Self {
74            correlation_id,
75            response_code,
76            streams,
77        }
78    }
79    pub fn is_ok(&self) -> bool {
80        self.response_code == ResponseCode::Ok
81    }
82}
83
84impl Encoder for SuperStreamPartitionsResponse {
85    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
86        self.correlation_id.encode(writer)?;
87        self.response_code.encode(writer)?;
88        self.streams.encode(writer)?;
89        Ok(())
90    }
91
92    fn encoded_size(&self) -> u32 {
93        self.correlation_id.encoded_size()
94            + self.response_code.encoded_size()
95            + self.streams.encoded_size()
96    }
97}
98
99impl Decoder for SuperStreamPartitionsResponse {
100    fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
101        let (input, correlation_id) = u32::decode(input)?;
102        let (input, response_code) = ResponseCode::decode(input)?;
103        let (input, streams) = <Vec<String>>::decode(input)?;
104
105        Ok((
106            input,
107            SuperStreamPartitionsResponse {
108                correlation_id,
109                response_code,
110                streams,
111            },
112        ))
113    }
114}
115
116impl FromResponse for SuperStreamPartitionsResponse {
117    fn from_response(response: crate::Response) -> Option<Self> {
118        match response.kind {
119            crate::ResponseKind::SuperStreamPartitions(partitions_response) => {
120                Some(partitions_response)
121            }
122            _ => None,
123        }
124    }
125}
126
127#[cfg(test)]
128mod tests {
129
130    use crate::commands::tests::command_encode_decode_test;
131
132    use super::SuperStreamPartitionsRequest;
133    use super::SuperStreamPartitionsResponse;
134
135    #[test]
136    fn super_stream_partition_request_test() {
137        command_encode_decode_test::<SuperStreamPartitionsRequest>();
138    }
139
140    #[test]
141    fn super_stream_partition_response_test() {
142        command_encode_decode_test::<SuperStreamPartitionsResponse>();
143    }
144}