rabbitmq_stream_protocol/commands/
superstream_route.rs1use std::io::Write;
2
3use crate::{
4 codec::{Decoder, Encoder},
5 error::{DecodeError, EncodeError},
6 protocol::commands::COMMAND_ROUTE,
7 FromResponse, ResponseCode,
8};
9
10use super::Command;
11
12#[cfg_attr(test, derive(fake::Dummy))]
13#[derive(PartialEq, Eq, Debug)]
14pub struct SuperStreamRouteRequest {
15 correlation_id: u32,
16 routing_key: String,
17 super_stream: String,
18}
19
20impl SuperStreamRouteRequest {
21 pub fn new(correlation_id: u32, routing_key: String, super_stream: String) -> Self {
22 Self {
23 correlation_id,
24 routing_key,
25 super_stream,
26 }
27 }
28}
29
30impl Encoder for SuperStreamRouteRequest {
31 fn encoded_size(&self) -> u32 {
32 self.correlation_id.encoded_size()
33 + self.routing_key.as_str().encoded_size()
34 + self.super_stream.as_str().encoded_size()
35 }
36
37 fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
38 self.correlation_id.encode(writer)?;
39 self.routing_key.as_str().encode(writer)?;
40 self.super_stream.as_str().encode(writer)?;
41 Ok(())
42 }
43}
44
45impl Decoder for SuperStreamRouteRequest {
46 fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
47 let (input, correlation_id) = u32::decode(input)?;
48 let (input, routing_key) = Option::decode(input)?;
49 let (input, super_stream) = Option::decode(input)?;
50
51 Ok((
52 input,
53 SuperStreamRouteRequest {
54 correlation_id,
55 routing_key: routing_key.unwrap(),
56 super_stream: super_stream.unwrap(),
57 },
58 ))
59 }
60}
61
62impl Command for SuperStreamRouteRequest {
63 fn key(&self) -> u16 {
64 COMMAND_ROUTE
65 }
66}
67
68#[cfg_attr(test, derive(fake::Dummy))]
69#[derive(PartialEq, Eq, Debug)]
70pub struct SuperStreamRouteResponse {
71 pub(crate) correlation_id: u32,
72 response_code: ResponseCode,
73 pub streams: Vec<String>,
74}
75
76impl SuperStreamRouteResponse {
77 pub fn new(correlation_id: u32, streams: Vec<String>, response_code: ResponseCode) -> Self {
78 Self {
79 correlation_id,
80 response_code,
81 streams,
82 }
83 }
84 pub fn is_ok(&self) -> bool {
85 self.response_code == ResponseCode::Ok
86 }
87}
88
89impl Encoder for SuperStreamRouteResponse {
90 fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
91 self.correlation_id.encode(writer)?;
92 self.response_code.encode(writer)?;
93 self.streams.encode(writer)?;
94 Ok(())
95 }
96
97 fn encoded_size(&self) -> u32 {
98 self.correlation_id.encoded_size()
99 + self.streams.encoded_size()
100 + self.response_code.encoded_size()
101 }
102}
103
104impl Decoder for SuperStreamRouteResponse {
105 fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
106 let (input, correlation_id) = u32::decode(input)?;
107 let (input, response_code) = ResponseCode::decode(input)?;
108 let (input, streams) = Vec::decode(input)?;
109
110 Ok((
111 input,
112 SuperStreamRouteResponse {
113 correlation_id,
114 response_code,
115 streams,
116 },
117 ))
118 }
119}
120
121impl FromResponse for SuperStreamRouteResponse {
122 fn from_response(response: crate::Response) -> Option<Self> {
123 match response.kind {
124 crate::ResponseKind::SuperStreamRoute(route) => Some(route),
125 _ => None,
126 }
127 }
128}
129
130#[cfg(test)]
131mod tests {
132
133 use crate::commands::tests::command_encode_decode_test;
134
135 use super::SuperStreamRouteRequest;
136 use super::SuperStreamRouteResponse;
137
138 #[test]
139 fn super_stream_route_request_test() {
140 command_encode_decode_test::<SuperStreamRouteRequest>();
141 }
142
143 #[test]
144 fn super_stream_route_response_test() {
145 command_encode_decode_test::<SuperStreamRouteResponse>();
146 }
147}