rabbitmq_stream_protocol/commands/
consumer_update_request.rs

1use std::io::Write;
2
3use crate::{
4    codec::{Decoder, Encoder},
5    error::{DecodeError, EncodeError},
6    protocol::commands::COMMAND_CONSUMER_UPDATE_REQUEST,
7};
8
9use crate::commands::subscribe::OffsetSpecification;
10
11use super::Command;
12
13#[cfg_attr(test, derive(fake::Dummy))]
14#[derive(PartialEq, Eq, Debug)]
15pub struct ConsumerUpdateRequestCommand {
16    pub(crate) correlation_id: u32,
17    response_code: u16,
18    offset_specification: OffsetSpecification,
19}
20
21impl ConsumerUpdateRequestCommand {
22    pub fn new(
23        correlation_id: u32,
24        response_code: u16,
25        offset_specification: OffsetSpecification,
26    ) -> Self {
27        Self {
28            correlation_id,
29            response_code,
30            offset_specification,
31        }
32    }
33}
34
35impl Encoder for ConsumerUpdateRequestCommand {
36    fn encoded_size(&self) -> u32 {
37        self.correlation_id.encoded_size()
38            + self.response_code.encoded_size()
39            + self.offset_specification.encoded_size()
40    }
41
42    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
43        self.correlation_id.encode(writer)?;
44        self.response_code.encode(writer)?;
45        self.offset_specification.encode(writer)?;
46        Ok(())
47    }
48}
49
50impl Decoder for ConsumerUpdateRequestCommand {
51    fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
52        let (input, correlation_id) = u32::decode(input)?;
53        let (input, response_code) = u16::decode(input)?;
54        let (input, offset_specification) = OffsetSpecification::decode(input)?;
55
56        Ok((
57            input,
58            ConsumerUpdateRequestCommand {
59                correlation_id,
60                response_code,
61                offset_specification,
62            },
63        ))
64    }
65}
66
67impl Command for ConsumerUpdateRequestCommand {
68    fn key(&self) -> u16 {
69        COMMAND_CONSUMER_UPDATE_REQUEST
70    }
71}
72
73#[cfg(test)]
74mod tests {
75    use crate::commands::tests::command_encode_decode_test;
76
77    use super::ConsumerUpdateRequestCommand;
78
79    #[test]
80    fn consumer_update_request_test() {
81        command_encode_decode_test::<ConsumerUpdateRequestCommand>();
82    }
83}