rabbitmq_stream_protocol/commands/
query_publisher_sequence.rs

1use crate::{
2    codec::{Decoder, Encoder},
3    error::{DecodeError, EncodeError},
4    protocol::commands::COMMAND_QUERY_PUBLISHER_SEQUENCE,
5    FromResponse, ResponseCode,
6};
7use std::io::Write;
8
9use super::Command;
10
11#[cfg_attr(test, derive(fake::Dummy))]
12#[derive(PartialEq, Eq, Debug)]
13pub struct QueryPublisherRequest {
14    correlation_id: u32,
15    publisher_reference: String,
16    stream: String,
17}
18
19impl QueryPublisherRequest {
20    pub fn new(correlation_id: u32, publisher_reference: String, stream: String) -> Self {
21        Self {
22            correlation_id,
23            publisher_reference,
24            stream,
25        }
26    }
27}
28
29impl Encoder for QueryPublisherRequest {
30    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
31        let size = self.publisher_reference.len();
32        if size >= 256 {
33            return Err(EncodeError::MaxSizeError(size));
34        }
35
36        self.correlation_id.encode(writer)?;
37        self.publisher_reference.as_str().encode(writer)?;
38        self.stream.as_str().encode(writer)?;
39        Ok(())
40    }
41
42    fn encoded_size(&self) -> u32 {
43        self.correlation_id.encoded_size()
44            + self.stream.as_str().encoded_size()
45            + self.publisher_reference.as_str().encoded_size()
46    }
47}
48
49impl Command for QueryPublisherRequest {
50    fn key(&self) -> u16 {
51        COMMAND_QUERY_PUBLISHER_SEQUENCE
52    }
53}
54
55impl Decoder for QueryPublisherRequest {
56    fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
57        let (input, correlation_id) = u32::decode(input)?;
58        let (input, opt_publisher_reference) = Option::decode(input)?;
59
60        if let Some(publisher_reference) = opt_publisher_reference {
61            match publisher_reference.len() {
62                0..=255 => {
63                    let (input, opt_stream) = Option::decode(input)?;
64
65                    return Ok((
66                        input,
67                        QueryPublisherRequest {
68                            correlation_id,
69                            publisher_reference,
70                            stream: opt_stream.unwrap(),
71                        },
72                    ));
73                }
74                size => return Err(DecodeError::MismatchSize(size)),
75            }
76        }
77
78        Err(DecodeError::Empty)
79    }
80}
81
82#[cfg_attr(test, derive(fake::Dummy))]
83#[derive(PartialEq, Eq, Debug)]
84pub struct QueryPublisherResponse {
85    pub(crate) correlation_id: u32,
86    response_code: ResponseCode,
87    sequence: u64,
88}
89
90impl QueryPublisherResponse {
91    pub fn new(correlation_id: u32, response_code: ResponseCode, sequence: u64) -> Self {
92        Self {
93            correlation_id,
94            response_code,
95            sequence,
96        }
97    }
98
99    pub fn from_response(&self) -> u64 {
100        self.sequence
101    }
102}
103
104impl Encoder for QueryPublisherResponse {
105    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
106        self.correlation_id.encode(writer)?;
107        self.response_code.encode(writer)?;
108        self.sequence.encode(writer)?;
109        Ok(())
110    }
111
112    fn encoded_size(&self) -> u32 {
113        self.sequence.encoded_size()
114            + self.correlation_id.encoded_size()
115            + self.response_code.encoded_size()
116    }
117}
118
119impl Decoder for QueryPublisherResponse {
120    fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
121        let (input, correlation_id) = u32::decode(input)?;
122        let (input, response_code) = ResponseCode::decode(input)?;
123        let (input, sequence) = u64::decode(input)?;
124        Ok((
125            input,
126            QueryPublisherResponse {
127                correlation_id,
128                response_code,
129                sequence,
130            },
131        ))
132    }
133}
134
135impl FromResponse for QueryPublisherResponse {
136    fn from_response(response: crate::Response) -> Option<Self> {
137        match response.kind {
138            crate::ResponseKind::QueryPublisherSequence(query_offset) => Some(query_offset),
139            _ => None,
140        }
141    }
142}
143
144#[cfg(test)]
145mod tests {
146    use super::QueryPublisherRequest;
147    use super::QueryPublisherResponse;
148    use crate::commands::tests::command_encode_decode_test;
149
150    #[test]
151    fn query_publisher_request_test() {
152        command_encode_decode_test::<QueryPublisherRequest>();
153    }
154
155    #[test]
156    fn query_publisher_response_test() {
157        command_encode_decode_test::<QueryPublisherResponse>();
158    }
159}