rabbitmq_stream_protocol/commands/
superstream_route.rs

1use std::io::Write;
2
3use crate::{
4    codec::{Decoder, Encoder},
5    error::{DecodeError, EncodeError},
6    protocol::commands::COMMAND_ROUTE,
7    FromResponse, ResponseCode,
8};
9
10use super::Command;
11
12#[cfg_attr(test, derive(fake::Dummy))]
13#[derive(PartialEq, Eq, Debug)]
14pub struct SuperStreamRouteRequest {
15    correlation_id: u32,
16    routing_key: String,
17    super_stream: String,
18}
19
20impl SuperStreamRouteRequest {
21    pub fn new(correlation_id: u32, routing_key: String, super_stream: String) -> Self {
22        Self {
23            correlation_id,
24            routing_key,
25            super_stream,
26        }
27    }
28}
29
30impl Encoder for SuperStreamRouteRequest {
31    fn encoded_size(&self) -> u32 {
32        self.correlation_id.encoded_size()
33            + self.routing_key.as_str().encoded_size()
34            + self.super_stream.as_str().encoded_size()
35    }
36
37    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
38        self.correlation_id.encode(writer)?;
39        self.routing_key.as_str().encode(writer)?;
40        self.super_stream.as_str().encode(writer)?;
41        Ok(())
42    }
43}
44
45impl Decoder for SuperStreamRouteRequest {
46    fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
47        let (input, correlation_id) = u32::decode(input)?;
48        let (input, routing_key) = Option::decode(input)?;
49        let (input, super_stream) = Option::decode(input)?;
50
51        Ok((
52            input,
53            SuperStreamRouteRequest {
54                correlation_id,
55                routing_key: routing_key.unwrap(),
56                super_stream: super_stream.unwrap(),
57            },
58        ))
59    }
60}
61
62impl Command for SuperStreamRouteRequest {
63    fn key(&self) -> u16 {
64        COMMAND_ROUTE
65    }
66}
67
68#[cfg_attr(test, derive(fake::Dummy))]
69#[derive(PartialEq, Eq, Debug)]
70pub struct SuperStreamRouteResponse {
71    pub(crate) correlation_id: u32,
72    response_code: ResponseCode,
73    pub streams: Vec<String>,
74}
75
76impl SuperStreamRouteResponse {
77    pub fn new(correlation_id: u32, streams: Vec<String>, response_code: ResponseCode) -> Self {
78        Self {
79            correlation_id,
80            response_code,
81            streams,
82        }
83    }
84    pub fn is_ok(&self) -> bool {
85        self.response_code == ResponseCode::Ok
86    }
87}
88
89impl Encoder for SuperStreamRouteResponse {
90    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
91        self.correlation_id.encode(writer)?;
92        self.response_code.encode(writer)?;
93        self.streams.encode(writer)?;
94        Ok(())
95    }
96
97    fn encoded_size(&self) -> u32 {
98        self.correlation_id.encoded_size()
99            + self.streams.encoded_size()
100            + self.response_code.encoded_size()
101    }
102}
103
104impl Decoder for SuperStreamRouteResponse {
105    fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
106        let (input, correlation_id) = u32::decode(input)?;
107        let (input, response_code) = ResponseCode::decode(input)?;
108        let (input, streams) = Vec::decode(input)?;
109
110        Ok((
111            input,
112            SuperStreamRouteResponse {
113                correlation_id,
114                response_code,
115                streams,
116            },
117        ))
118    }
119}
120
121impl FromResponse for SuperStreamRouteResponse {
122    fn from_response(response: crate::Response) -> Option<Self> {
123        match response.kind {
124            crate::ResponseKind::SuperStreamRoute(route) => Some(route),
125            _ => None,
126        }
127    }
128}
129
130#[cfg(test)]
131mod tests {
132
133    use crate::commands::tests::command_encode_decode_test;
134
135    use super::SuperStreamRouteRequest;
136    use super::SuperStreamRouteResponse;
137
138    #[test]
139    fn super_stream_route_request_test() {
140        command_encode_decode_test::<SuperStreamRouteRequest>();
141    }
142
143    #[test]
144    fn super_stream_route_response_test() {
145        command_encode_decode_test::<SuperStreamRouteResponse>();
146    }
147}