rabbitmq_stream_protocol/commands/
query_publisher_sequence.rs1use crate::{
2 codec::{Decoder, Encoder},
3 error::{DecodeError, EncodeError},
4 protocol::commands::COMMAND_QUERY_PUBLISHER_SEQUENCE,
5 FromResponse, ResponseCode,
6};
7use std::io::Write;
8
9use super::Command;
10
11#[cfg_attr(test, derive(fake::Dummy))]
12#[derive(PartialEq, Eq, Debug)]
13pub struct QueryPublisherRequest {
14 correlation_id: u32,
15 publisher_reference: String,
16 stream: String,
17}
18
19impl QueryPublisherRequest {
20 pub fn new(correlation_id: u32, publisher_reference: String, stream: String) -> Self {
21 Self {
22 correlation_id,
23 publisher_reference,
24 stream,
25 }
26 }
27}
28
29impl Encoder for QueryPublisherRequest {
30 fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
31 let size = self.publisher_reference.len();
32 if size >= 256 {
33 return Err(EncodeError::MaxSizeError(size));
34 }
35
36 self.correlation_id.encode(writer)?;
37 self.publisher_reference.as_str().encode(writer)?;
38 self.stream.as_str().encode(writer)?;
39 Ok(())
40 }
41
42 fn encoded_size(&self) -> u32 {
43 self.correlation_id.encoded_size()
44 + self.stream.as_str().encoded_size()
45 + self.publisher_reference.as_str().encoded_size()
46 }
47}
48
49impl Command for QueryPublisherRequest {
50 fn key(&self) -> u16 {
51 COMMAND_QUERY_PUBLISHER_SEQUENCE
52 }
53}
54
55impl Decoder for QueryPublisherRequest {
56 fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
57 let (input, correlation_id) = u32::decode(input)?;
58 let (input, opt_publisher_reference) = Option::decode(input)?;
59
60 if let Some(publisher_reference) = opt_publisher_reference {
61 match publisher_reference.len() {
62 0..=255 => {
63 let (input, opt_stream) = Option::decode(input)?;
64
65 return Ok((
66 input,
67 QueryPublisherRequest {
68 correlation_id,
69 publisher_reference,
70 stream: opt_stream.unwrap(),
71 },
72 ));
73 }
74 size => return Err(DecodeError::MismatchSize(size)),
75 }
76 }
77
78 Err(DecodeError::Empty)
79 }
80}
81
82#[cfg_attr(test, derive(fake::Dummy))]
83#[derive(PartialEq, Eq, Debug)]
84pub struct QueryPublisherResponse {
85 pub(crate) correlation_id: u32,
86 response_code: ResponseCode,
87 sequence: u64,
88}
89
90impl QueryPublisherResponse {
91 pub fn new(correlation_id: u32, response_code: ResponseCode, sequence: u64) -> Self {
92 Self {
93 correlation_id,
94 response_code,
95 sequence,
96 }
97 }
98
99 pub fn from_response(&self) -> u64 {
100 self.sequence
101 }
102}
103
104impl Encoder for QueryPublisherResponse {
105 fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
106 self.correlation_id.encode(writer)?;
107 self.response_code.encode(writer)?;
108 self.sequence.encode(writer)?;
109 Ok(())
110 }
111
112 fn encoded_size(&self) -> u32 {
113 self.sequence.encoded_size()
114 + self.correlation_id.encoded_size()
115 + self.response_code.encoded_size()
116 }
117}
118
119impl Decoder for QueryPublisherResponse {
120 fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
121 let (input, correlation_id) = u32::decode(input)?;
122 let (input, response_code) = ResponseCode::decode(input)?;
123 let (input, sequence) = u64::decode(input)?;
124 Ok((
125 input,
126 QueryPublisherResponse {
127 correlation_id,
128 response_code,
129 sequence,
130 },
131 ))
132 }
133}
134
135impl FromResponse for QueryPublisherResponse {
136 fn from_response(response: crate::Response) -> Option<Self> {
137 match response.kind {
138 crate::ResponseKind::QueryPublisherSequence(query_offset) => Some(query_offset),
139 _ => None,
140 }
141 }
142}
143
144#[cfg(test)]
145mod tests {
146 use super::QueryPublisherRequest;
147 use super::QueryPublisherResponse;
148 use crate::commands::tests::command_encode_decode_test;
149
150 #[test]
151 fn query_publisher_request_test() {
152 command_encode_decode_test::<QueryPublisherRequest>();
153 }
154
155 #[test]
156 fn query_publisher_response_test() {
157 command_encode_decode_test::<QueryPublisherResponse>();
158 }
159}