rabbitmq_stream_protocol/commands/
superstream_partitions.rs1use std::io::Write;
2
3#[cfg(test)]
4use fake::Fake;
5
6use super::Command;
7use crate::{
8 codec::{Decoder, Encoder},
9 error::{DecodeError, EncodeError},
10 protocol::commands::COMMAND_PARTITIONS,
11 FromResponse, ResponseCode,
12};
13
14#[cfg_attr(test, derive(fake::Dummy))]
15#[derive(PartialEq, Eq, Debug)]
16pub struct SuperStreamPartitionsRequest {
17 correlation_id: u32,
18 super_stream: String,
19}
20
21impl SuperStreamPartitionsRequest {
22 pub fn new(correlation_id: u32, super_stream: String) -> Self {
23 Self {
24 correlation_id,
25 super_stream,
26 }
27 }
28}
29
30impl Encoder for SuperStreamPartitionsRequest {
31 fn encoded_size(&self) -> u32 {
32 self.correlation_id.encoded_size() + self.super_stream.as_str().encoded_size()
33 }
34
35 fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
36 self.correlation_id.encode(writer)?;
37 self.super_stream.as_str().encode(writer)?;
38 Ok(())
39 }
40}
41
42impl Decoder for SuperStreamPartitionsRequest {
43 fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
44 let (input, correlation_id) = u32::decode(input)?;
45 let (input, super_stream) = Option::decode(input)?;
46
47 Ok((
48 input,
49 SuperStreamPartitionsRequest {
50 correlation_id,
51 super_stream: super_stream.unwrap(),
52 },
53 ))
54 }
55}
56
57impl Command for SuperStreamPartitionsRequest {
58 fn key(&self) -> u16 {
59 COMMAND_PARTITIONS
60 }
61}
62
63#[cfg_attr(test, derive(fake::Dummy))]
64#[derive(PartialEq, Eq, Debug)]
65pub struct SuperStreamPartitionsResponse {
66 pub(crate) correlation_id: u32,
67 response_code: ResponseCode,
68 pub streams: Vec<String>,
69}
70
71impl SuperStreamPartitionsResponse {
72 pub fn new(correlation_id: u32, streams: Vec<String>, response_code: ResponseCode) -> Self {
73 Self {
74 correlation_id,
75 response_code,
76 streams,
77 }
78 }
79 pub fn is_ok(&self) -> bool {
80 self.response_code == ResponseCode::Ok
81 }
82}
83
84impl Encoder for SuperStreamPartitionsResponse {
85 fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
86 self.correlation_id.encode(writer)?;
87 self.response_code.encode(writer)?;
88 self.streams.encode(writer)?;
89 Ok(())
90 }
91
92 fn encoded_size(&self) -> u32 {
93 self.correlation_id.encoded_size()
94 + self.response_code.encoded_size()
95 + self.streams.encoded_size()
96 }
97}
98
99impl Decoder for SuperStreamPartitionsResponse {
100 fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
101 let (input, correlation_id) = u32::decode(input)?;
102 let (input, response_code) = ResponseCode::decode(input)?;
103 let (input, streams) = <Vec<String>>::decode(input)?;
104
105 Ok((
106 input,
107 SuperStreamPartitionsResponse {
108 correlation_id,
109 response_code,
110 streams,
111 },
112 ))
113 }
114}
115
116impl FromResponse for SuperStreamPartitionsResponse {
117 fn from_response(response: crate::Response) -> Option<Self> {
118 match response.kind {
119 crate::ResponseKind::SuperStreamPartitions(partitions_response) => {
120 Some(partitions_response)
121 }
122 _ => None,
123 }
124 }
125}
126
127#[cfg(test)]
128mod tests {
129
130 use crate::commands::tests::command_encode_decode_test;
131
132 use super::SuperStreamPartitionsRequest;
133 use super::SuperStreamPartitionsResponse;
134
135 #[test]
136 fn super_stream_partition_request_test() {
137 command_encode_decode_test::<SuperStreamPartitionsRequest>();
138 }
139
140 #[test]
141 fn super_stream_partition_response_test() {
142 command_encode_decode_test::<SuperStreamPartitionsResponse>();
143 }
144}