rabbitmq_stream_protocol/commands/
metadata.rs1use std::io::Write;
2
3use crate::{
4 codec::{decoder::read_vec, Decoder, Encoder},
5 error::{DecodeError, EncodeError},
6 protocol::commands::COMMAND_METADATA,
7 FromResponse, ResponseCode,
8};
9
10use super::Command;
11
12use byteorder::{BigEndian, WriteBytesExt};
13
14#[cfg_attr(test, derive(fake::Dummy))]
15#[derive(PartialEq, Eq, Debug)]
16pub struct MetadataCommand {
17 correlation_id: u32,
18 streams: Vec<String>,
19}
20
21impl MetadataCommand {
22 pub fn new(correlation_id: u32, streams: Vec<String>) -> Self {
23 Self {
24 correlation_id,
25 streams,
26 }
27 }
28}
29
30impl Encoder for MetadataCommand {
31 fn encoded_size(&self) -> u32 {
32 self.correlation_id.encoded_size() + self.streams.encoded_size()
33 }
34
35 fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
36 self.correlation_id.encode(writer)?;
37 self.streams.encode(writer)?;
38 Ok(())
39 }
40}
41
42impl Decoder for MetadataCommand {
43 fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
44 let (input, correlation_id) = u32::decode(input)?;
45 let (input, streams) = Vec::<String>::decode(input)?;
46
47 Ok((
48 input,
49 MetadataCommand {
50 correlation_id,
51 streams,
52 },
53 ))
54 }
55}
56
57impl Command for MetadataCommand {
58 fn key(&self) -> u16 {
59 COMMAND_METADATA
60 }
61}
62
63#[cfg_attr(test, derive(fake::Dummy))]
64#[derive(Debug, Eq, PartialEq)]
65pub struct MetadataResponse {
66 pub(crate) correlation_id: u32,
67 pub brokers: Vec<Broker>,
68 pub stream_metadata: Vec<StreamMetadata>,
69}
70
71#[cfg_attr(test, derive(fake::Dummy))]
72#[derive(Debug, PartialEq, Eq)]
73pub struct Broker {
74 pub reference: u16,
75 pub host: String,
76 pub port: u32,
77}
78
79#[cfg_attr(test, derive(fake::Dummy))]
80#[derive(Debug, PartialEq, Eq)]
81pub struct StreamMetadata {
82 pub stream_name: String,
83 pub code: ResponseCode,
84 pub leader_reference: u16,
85 pub replicas_references: Vec<u16>,
86}
87
88impl Decoder for Broker {
89 fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
90 let (input, reference) = u16::decode(input)?;
91 let (input, host) = Option::decode(input)?;
92 let (input, port) = u32::decode(input)?;
93 Ok((
94 input,
95 Broker {
96 reference,
97 host: host.unwrap(),
98 port,
99 },
100 ))
101 }
102}
103impl Decoder for StreamMetadata {
104 fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
105 let (input, stream_name) = Option::decode(input)?;
106 let (input, code) = ResponseCode::decode(input)?;
107 let (input, leader_reference) = u16::decode(input)?;
108 let (input, replicas_references) = Vec::decode(input)?;
109
110 Ok((
111 input,
112 StreamMetadata {
113 stream_name: stream_name.unwrap(),
114 code,
115 leader_reference,
116 replicas_references,
117 },
118 ))
119 }
120}
121
122impl Decoder for MetadataResponse {
123 fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
124 let (input, correlation_id) = u32::decode(input)?;
125 let (input, brokers) = read_vec(input)?;
126 let (input, stream_metadata) = read_vec(input)?;
127 Ok((
128 input,
129 MetadataResponse {
130 correlation_id,
131 brokers,
132 stream_metadata,
133 },
134 ))
135 }
136}
137
138impl Encoder for Broker {
139 fn encoded_size(&self) -> u32 {
140 self.reference.encoded_size() + self.port.encoded_size() + self.host.as_str().encoded_size()
141 }
142
143 fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
144 self.reference.encode(writer)?;
145 self.host.as_str().encode(writer)?;
146 self.port.encode(writer)?;
147 Ok(())
148 }
149}
150
151impl Encoder for StreamMetadata {
152 fn encoded_size(&self) -> u32 {
153 self.stream_name.as_str().encoded_size()
154 + self.code.encoded_size()
155 + self.leader_reference.encoded_size()
156 + self.replicas_references.encoded_size()
157 }
158
159 fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
160 self.stream_name.as_str().encode(writer)?;
161 self.code.encode(writer)?;
162 self.leader_reference.encode(writer)?;
163 self.replicas_references.encode(writer)?;
164 Ok(())
165 }
166}
167
168impl Encoder for Vec<Broker> {
169 fn encoded_size(&self) -> u32 {
170 4 + self.iter().fold(0, |acc, v| acc + v.encoded_size())
171 }
172
173 fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
174 writer.write_i32::<BigEndian>(self.len() as i32)?;
175 for x in self {
176 x.encode(writer)?;
177 }
178 Ok(())
179 }
180}
181
182impl Encoder for Vec<StreamMetadata> {
183 fn encoded_size(&self) -> u32 {
184 4 + self.iter().fold(0, |acc, v| acc + v.encoded_size())
185 }
186
187 fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
188 writer.write_i32::<BigEndian>(self.len() as i32)?;
189 for x in self {
190 x.encode(writer)?;
191 }
192 Ok(())
193 }
194}
195
196impl Encoder for MetadataResponse {
197 fn encoded_size(&self) -> u32 {
198 0
199 }
200
201 fn encode(&self, writer: &mut impl std::io::Write) -> Result<(), crate::error::EncodeError> {
202 self.correlation_id.encode(writer)?;
203 self.brokers.encode(writer)?;
204 self.stream_metadata.encode(writer)?;
205 Ok(())
206 }
207}
208
209impl FromResponse for MetadataResponse {
210 fn from_response(response: crate::Response) -> Option<Self> {
211 match response.kind {
212 crate::ResponseKind::Metadata(metadata) => Some(metadata),
213 _ => None,
214 }
215 }
216}
217#[cfg(test)]
218mod tests {
219
220 use crate::commands::metadata::{MetadataCommand, MetadataResponse};
221 use crate::commands::tests::command_encode_decode_test;
222
223 #[test]
224 fn metadata_request_test() {
225 command_encode_decode_test::<MetadataCommand>()
226 }
227
228 #[test]
229 fn metadata_response_test() {
230 command_encode_decode_test::<MetadataResponse>();
231 }
232}