rabbitmq_stream_protocol/commands/
consumer_update.rs

1use std::io::Write;
2
3use crate::{
4    codec::{Decoder, Encoder},
5    error::{DecodeError, EncodeError},
6    protocol::commands::COMMAND_CONSUMER_UPDATE,
7};
8
9use super::Command;
10
11#[cfg_attr(test, derive(fake::Dummy))]
12#[derive(PartialEq, Eq, Debug)]
13pub struct ConsumerUpdateCommand {
14    pub(crate) correlation_id: u32,
15    subscription_id: u8,
16    active: u8,
17}
18
19impl ConsumerUpdateCommand {
20    pub fn new(correlation_id: u32, subscription_id: u8, active: u8) -> Self {
21        Self {
22            correlation_id,
23            subscription_id,
24            active,
25        }
26    }
27
28    pub fn get_correlation_id(&self) -> u32 {
29        self.correlation_id
30    }
31
32    pub fn is_active(&self) -> u8 {
33        self.active
34    }
35}
36
37impl Encoder for ConsumerUpdateCommand {
38    fn encoded_size(&self) -> u32 {
39        self.correlation_id.encoded_size()
40            + self.subscription_id.encoded_size()
41            + self.active.encoded_size()
42    }
43
44    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
45        self.correlation_id.encode(writer)?;
46        self.subscription_id.encode(writer)?;
47        self.active.encode(writer)?;
48        Ok(())
49    }
50}
51
52impl Decoder for ConsumerUpdateCommand {
53    fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
54        let (input, correlation_id) = u32::decode(input)?;
55        let (input, subscription_id) = u8::decode(input)?;
56        let (input, active) = u8::decode(input)?;
57
58        Ok((
59            input,
60            ConsumerUpdateCommand {
61                correlation_id,
62                subscription_id,
63                active,
64            },
65        ))
66    }
67}
68
69impl Command for ConsumerUpdateCommand {
70    fn key(&self) -> u16 {
71        COMMAND_CONSUMER_UPDATE
72    }
73}
74
75#[cfg(test)]
76mod tests {
77    use crate::commands::tests::command_encode_decode_test;
78
79    use super::ConsumerUpdateCommand;
80
81    #[test]
82    fn consumer_update_response_test() {
83        command_encode_decode_test::<ConsumerUpdateCommand>();
84    }
85}