rabbitmq_stream_protocol/commands/
exchange_command_versions.rs1use std::io::Write;
2
3use crate::{
4 codec::{decoder::read_vec, Decoder, Encoder},
5 error::{DecodeError, EncodeError},
6 protocol::commands::COMMAND_EXCHANGE_COMMAND_VERSIONS,
7 response::{FromResponse, ResponseCode},
8};
9
10use super::Command;
11use byteorder::{BigEndian, WriteBytesExt};
12
13#[cfg_attr(test, derive(fake::Dummy))]
14#[derive(PartialEq, Eq, Debug)]
15pub struct ExchangeCommandVersion(u16, u16, u16);
16
17impl ExchangeCommandVersion {
18 pub fn new(key: u16, min_version: u16, max_version: u16) -> Self {
19 ExchangeCommandVersion(key, min_version, max_version)
20 }
21}
22
23impl Encoder for ExchangeCommandVersion {
24 fn encoded_size(&self) -> u32 {
25 self.0.encoded_size() + self.1.encoded_size() + self.2.encoded_size()
26 }
27
28 fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
29 self.0.encode(writer)?;
30 self.1.encode(writer)?;
31 self.2.encode(writer)?;
32
33 Ok(())
34 }
35}
36
37impl Decoder for ExchangeCommandVersion {
38 fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
39 let (input, key) = u16::decode(input)?;
40 let (input, min_version) = u16::decode(input)?;
41 let (input, max_version) = u16::decode(input)?;
42 Ok((input, ExchangeCommandVersion(key, min_version, max_version)))
43 }
44}
45
46impl Encoder for Vec<ExchangeCommandVersion> {
47 fn encoded_size(&self) -> u32 {
48 4 + self.iter().fold(0, |acc, v| acc + v.encoded_size())
49 }
50
51 fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
52 writer.write_u32::<BigEndian>(self.len() as u32)?;
53 for x in self {
54 x.encode(writer)?;
55 }
56 Ok(())
57 }
58}
59
60impl Decoder for Vec<ExchangeCommandVersion> {
61 fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
62 let (input, result) = read_vec(input)?;
63 Ok((input, result))
64 }
65}
66
67#[cfg_attr(test, derive(fake::Dummy))]
68#[derive(PartialEq, Eq, Debug)]
69pub struct ExchangeCommandVersionsRequest {
70 pub(crate) correlation_id: u32,
71 commands: Vec<ExchangeCommandVersion>,
72}
73
74impl ExchangeCommandVersionsRequest {
75 pub fn new(correlation_id: u32, commands: Vec<ExchangeCommandVersion>) -> Self {
76 Self {
77 correlation_id,
78 commands,
79 }
80 }
81}
82
83impl Encoder for ExchangeCommandVersionsRequest {
84 fn encoded_size(&self) -> u32 {
85 self.correlation_id.encoded_size() + self.commands.encoded_size()
86 }
87
88 fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
89 self.correlation_id.encode(writer)?;
90 self.commands.encode(writer)?;
91 Ok(())
92 }
93}
94
95impl Command for ExchangeCommandVersionsRequest {
96 fn key(&self) -> u16 {
97 COMMAND_EXCHANGE_COMMAND_VERSIONS
98 }
99}
100
101impl Decoder for ExchangeCommandVersionsRequest {
102 fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
103 let (input, correlation_id) = u32::decode(input)?;
104 let (input, commands) = <Vec<ExchangeCommandVersion>>::decode(input)?;
105 Ok((
106 input,
107 ExchangeCommandVersionsRequest {
108 correlation_id,
109 commands,
110 },
111 ))
112 }
113}
114
115#[cfg_attr(test, derive(fake::Dummy))]
116#[derive(PartialEq, Eq, Debug)]
117pub struct ExchangeCommandVersionsResponse {
118 pub(crate) correlation_id: u32,
119 response_code: ResponseCode,
120 commands: Vec<ExchangeCommandVersion>,
121}
122
123impl ExchangeCommandVersionsResponse {
124 pub fn new(
125 correlation_id: u32,
126 response_code: ResponseCode,
127 commands: Vec<ExchangeCommandVersion>,
128 ) -> Self {
129 Self {
130 correlation_id,
131 response_code,
132 commands,
133 }
134 }
135
136 pub fn code(&self) -> &ResponseCode {
137 &self.response_code
138 }
139
140 pub fn is_ok(&self) -> bool {
141 self.response_code == ResponseCode::Ok
142 }
143
144 pub fn key_version(&self, key_command: u16) -> (u16, u16) {
145 for i in &self.commands {
146 match i {
147 ExchangeCommandVersion(match_key_command, min_version, max_version) => {
148 if *match_key_command == key_command {
149 return (*min_version, *max_version);
150 }
151 }
152 }
153 }
154
155 (1, 1)
156 }
157}
158
159impl Encoder for ExchangeCommandVersionsResponse {
160 fn encoded_size(&self) -> u32 {
161 self.correlation_id.encoded_size()
162 + self.response_code.encoded_size()
163 + self.commands.encoded_size()
164 }
165
166 fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
167 self.correlation_id.encode(writer)?;
168 self.response_code.encode(writer)?;
169 self.commands.encode(writer)?;
170 Ok(())
171 }
172}
173
174impl Decoder for ExchangeCommandVersionsResponse {
175 fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
176 let (input, correlation_id) = u32::decode(input)?;
177 let (input, response_code) = ResponseCode::decode(input)?;
178 let (input, commands) = <Vec<ExchangeCommandVersion>>::decode(input)?;
179
180 Ok((
181 input,
182 ExchangeCommandVersionsResponse {
183 correlation_id,
184 response_code,
185 commands,
186 },
187 ))
188 }
189}
190
191impl FromResponse for ExchangeCommandVersionsResponse {
192 fn from_response(response: crate::Response) -> Option<Self> {
193 match response.kind {
194 crate::ResponseKind::ExchangeCommandVersions(exchange_command_versions) => {
195 Some(exchange_command_versions)
196 }
197 _ => None,
198 }
199 }
200}
201
202#[cfg(test)]
203mod tests {
204
205 use crate::commands::tests::command_encode_decode_test;
206
207 use super::{ExchangeCommandVersionsRequest, ExchangeCommandVersionsResponse};
208
209 #[test]
210 fn exchange_command_versions_request_test() {
211 command_encode_decode_test::<ExchangeCommandVersionsRequest>();
212 }
213
214 #[test]
215 fn exchange_command_versions_response_test() {
216 command_encode_decode_test::<ExchangeCommandVersionsResponse>();
217 }
218}