rabbitmq_stream_protocol/commands/
query_offset.rs

1use crate::{
2    codec::{Decoder, Encoder},
3    error::{DecodeError, EncodeError},
4    protocol::commands::COMMAND_QUERY_OFFSET,
5    FromResponse, ResponseCode,
6};
7use std::io::Write;
8
9use super::Command;
10
11#[cfg_attr(test, derive(fake::Dummy))]
12#[derive(PartialEq, Eq, Debug)]
13pub struct QueryOffsetRequest {
14    correlation_id: u32,
15    reference: String,
16    stream: String,
17}
18
19impl QueryOffsetRequest {
20    pub fn new(correlation_id: u32, reference: String, stream: String) -> Self {
21        Self {
22            correlation_id,
23            reference,
24            stream,
25        }
26    }
27}
28
29impl Encoder for QueryOffsetRequest {
30    fn encoded_size(&self) -> u32 {
31        self.correlation_id.encoded_size()
32            + self.stream.as_str().encoded_size()
33            + self.reference.as_str().encoded_size()
34    }
35
36    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
37        let size = self.reference.len();
38        if size >= 256 {
39            return Err(EncodeError::MaxSizeError(size));
40        }
41
42        self.correlation_id.encode(writer)?;
43        self.reference.as_str().encode(writer)?;
44        self.stream.as_str().encode(writer)?;
45        Ok(())
46    }
47}
48
49impl Command for QueryOffsetRequest {
50    fn key(&self) -> u16 {
51        COMMAND_QUERY_OFFSET
52    }
53}
54
55impl Decoder for QueryOffsetRequest {
56    fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
57        let (input, correlation_id) = u32::decode(input)?;
58        let (input, opt_reference) = Option::decode(input)?;
59
60        if let Some(reference) = opt_reference {
61            match reference.len() {
62                0..=255 => {
63                    let (input, opt_stream) = Option::decode(input)?;
64
65                    return Ok((
66                        input,
67                        QueryOffsetRequest {
68                            correlation_id,
69                            reference,
70                            stream: opt_stream.unwrap(),
71                        },
72                    ));
73                }
74                size => return Err(DecodeError::MismatchSize(size)),
75            }
76        }
77
78        Err(DecodeError::Empty)
79    }
80}
81
82#[cfg_attr(test, derive(fake::Dummy))]
83#[derive(PartialEq, Eq, Debug)]
84pub struct QueryOffsetResponse {
85    pub(crate) correlation_id: u32,
86    response_code: ResponseCode,
87    offset: u64,
88}
89
90impl QueryOffsetResponse {
91    pub fn new(correlation_id: u32, response_code: ResponseCode, offset: u64) -> Self {
92        Self {
93            correlation_id,
94            response_code,
95            offset,
96        }
97    }
98
99    pub fn from_response(&self) -> u64 {
100        self.offset
101    }
102
103    pub fn code(&self) -> &ResponseCode {
104        &self.response_code
105    }
106
107    pub fn is_ok(&self) -> bool {
108        self.response_code == ResponseCode::Ok
109    }
110}
111
112impl Encoder for QueryOffsetResponse {
113    fn encoded_size(&self) -> u32 {
114        self.offset.encoded_size()
115            + self.correlation_id.encoded_size()
116            + self.response_code.encoded_size()
117    }
118
119    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
120        self.correlation_id.encode(writer)?;
121        self.response_code.encode(writer)?;
122        self.offset.encode(writer)?;
123        Ok(())
124    }
125}
126
127impl Decoder for QueryOffsetResponse {
128    fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
129        let (input, correlation_id) = u32::decode(input)?;
130        let (input, response_code) = ResponseCode::decode(input)?;
131        let (input, offset) = u64::decode(input)?;
132        Ok((
133            input,
134            QueryOffsetResponse {
135                correlation_id,
136                response_code,
137                offset,
138            },
139        ))
140    }
141}
142
143impl FromResponse for QueryOffsetResponse {
144    fn from_response(response: crate::Response) -> Option<Self> {
145        match response.kind {
146            crate::ResponseKind::QueryOffset(query_offset) => Some(query_offset),
147            _ => None,
148        }
149    }
150}
151
152#[cfg(test)]
153mod tests {
154    use super::QueryOffsetRequest;
155    use super::QueryOffsetResponse;
156    use crate::commands::tests::command_encode_decode_test;
157
158    #[test]
159    fn query_offset_request_response_test() {
160        command_encode_decode_test::<QueryOffsetRequest>();
161    }
162
163    #[test]
164    fn query_offset_response_response_test() {
165        command_encode_decode_test::<QueryOffsetResponse>();
166    }
167}