rabbitmq_stream_protocol/commands/
query_offset.rs1use crate::{
2 codec::{Decoder, Encoder},
3 error::{DecodeError, EncodeError},
4 protocol::commands::COMMAND_QUERY_OFFSET,
5 FromResponse, ResponseCode,
6};
7use std::io::Write;
8
9use super::Command;
10
11#[cfg_attr(test, derive(fake::Dummy))]
12#[derive(PartialEq, Eq, Debug)]
13pub struct QueryOffsetRequest {
14 correlation_id: u32,
15 reference: String,
16 stream: String,
17}
18
19impl QueryOffsetRequest {
20 pub fn new(correlation_id: u32, reference: String, stream: String) -> Self {
21 Self {
22 correlation_id,
23 reference,
24 stream,
25 }
26 }
27}
28
29impl Encoder for QueryOffsetRequest {
30 fn encoded_size(&self) -> u32 {
31 self.correlation_id.encoded_size()
32 + self.stream.as_str().encoded_size()
33 + self.reference.as_str().encoded_size()
34 }
35
36 fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
37 let size = self.reference.len();
38 if size >= 256 {
39 return Err(EncodeError::MaxSizeError(size));
40 }
41
42 self.correlation_id.encode(writer)?;
43 self.reference.as_str().encode(writer)?;
44 self.stream.as_str().encode(writer)?;
45 Ok(())
46 }
47}
48
49impl Command for QueryOffsetRequest {
50 fn key(&self) -> u16 {
51 COMMAND_QUERY_OFFSET
52 }
53}
54
55impl Decoder for QueryOffsetRequest {
56 fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
57 let (input, correlation_id) = u32::decode(input)?;
58 let (input, opt_reference) = Option::decode(input)?;
59
60 if let Some(reference) = opt_reference {
61 match reference.len() {
62 0..=255 => {
63 let (input, opt_stream) = Option::decode(input)?;
64
65 return Ok((
66 input,
67 QueryOffsetRequest {
68 correlation_id,
69 reference,
70 stream: opt_stream.unwrap(),
71 },
72 ));
73 }
74 size => return Err(DecodeError::MismatchSize(size)),
75 }
76 }
77
78 Err(DecodeError::Empty)
79 }
80}
81
82#[cfg_attr(test, derive(fake::Dummy))]
83#[derive(PartialEq, Eq, Debug)]
84pub struct QueryOffsetResponse {
85 pub(crate) correlation_id: u32,
86 response_code: ResponseCode,
87 offset: u64,
88}
89
90impl QueryOffsetResponse {
91 pub fn new(correlation_id: u32, response_code: ResponseCode, offset: u64) -> Self {
92 Self {
93 correlation_id,
94 response_code,
95 offset,
96 }
97 }
98
99 pub fn from_response(&self) -> u64 {
100 self.offset
101 }
102
103 pub fn code(&self) -> &ResponseCode {
104 &self.response_code
105 }
106
107 pub fn is_ok(&self) -> bool {
108 self.response_code == ResponseCode::Ok
109 }
110}
111
112impl Encoder for QueryOffsetResponse {
113 fn encoded_size(&self) -> u32 {
114 self.offset.encoded_size()
115 + self.correlation_id.encoded_size()
116 + self.response_code.encoded_size()
117 }
118
119 fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
120 self.correlation_id.encode(writer)?;
121 self.response_code.encode(writer)?;
122 self.offset.encode(writer)?;
123 Ok(())
124 }
125}
126
127impl Decoder for QueryOffsetResponse {
128 fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
129 let (input, correlation_id) = u32::decode(input)?;
130 let (input, response_code) = ResponseCode::decode(input)?;
131 let (input, offset) = u64::decode(input)?;
132 Ok((
133 input,
134 QueryOffsetResponse {
135 correlation_id,
136 response_code,
137 offset,
138 },
139 ))
140 }
141}
142
143impl FromResponse for QueryOffsetResponse {
144 fn from_response(response: crate::Response) -> Option<Self> {
145 match response.kind {
146 crate::ResponseKind::QueryOffset(query_offset) => Some(query_offset),
147 _ => None,
148 }
149 }
150}
151
152#[cfg(test)]
153mod tests {
154 use super::QueryOffsetRequest;
155 use super::QueryOffsetResponse;
156 use crate::commands::tests::command_encode_decode_test;
157
158 #[test]
159 fn query_offset_request_response_test() {
160 command_encode_decode_test::<QueryOffsetRequest>();
161 }
162
163 #[test]
164 fn query_offset_response_response_test() {
165 command_encode_decode_test::<QueryOffsetResponse>();
166 }
167}