rabbitmq_stream_protocol/commands/
consumer_update_request.rs

1use std::io::Write;
2
3#[cfg(test)]
4use fake::Fake;
5
6use crate::{
7    codec::{Decoder, Encoder},
8    error::{DecodeError, EncodeError},
9    protocol::commands::COMMAND_CONSUMER_UPDATE_REQUEST,
10};
11
12use crate::commands::subscribe::OffsetSpecification;
13
14use super::Command;
15
16#[cfg_attr(test, derive(fake::Dummy))]
17#[derive(PartialEq, Eq, Debug)]
18pub struct ConsumerUpdateRequestCommand {
19    pub(crate) correlation_id: u32,
20    response_code: u16,
21    offset_specification: OffsetSpecification,
22}
23
24impl ConsumerUpdateRequestCommand {
25    pub fn new(
26        correlation_id: u32,
27        response_code: u16,
28        offset_specification: OffsetSpecification,
29    ) -> Self {
30        Self {
31            correlation_id,
32            response_code,
33            offset_specification,
34        }
35    }
36}
37
38impl Encoder for ConsumerUpdateRequestCommand {
39    fn encoded_size(&self) -> u32 {
40        self.correlation_id.encoded_size()
41            + self.response_code.encoded_size()
42            + self.offset_specification.encoded_size()
43    }
44
45    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
46        self.correlation_id.encode(writer)?;
47        self.response_code.encode(writer)?;
48        self.offset_specification.encode(writer)?;
49        Ok(())
50    }
51}
52
53impl Decoder for ConsumerUpdateRequestCommand {
54    fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
55        let (input, correlation_id) = u32::decode(input)?;
56        let (input, response_code) = u16::decode(input)?;
57        let (input, offset_specification) = OffsetSpecification::decode(input)?;
58
59        Ok((
60            input,
61            ConsumerUpdateRequestCommand {
62                correlation_id,
63                response_code,
64                offset_specification,
65            },
66        ))
67    }
68}
69
70impl Command for ConsumerUpdateRequestCommand {
71    fn key(&self) -> u16 {
72        COMMAND_CONSUMER_UPDATE_REQUEST
73    }
74}
75
76#[cfg(test)]
77mod tests {
78    use crate::commands::tests::command_encode_decode_test;
79
80    use super::ConsumerUpdateRequestCommand;
81
82    #[test]
83    fn consumer_update_request_test() {
84        command_encode_decode_test::<ConsumerUpdateRequestCommand>();
85    }
86}