rabbitmq_stream_protocol/commands/
consumer_update.rs1use std::io::Write;
2
3use crate::{
4 codec::{Decoder, Encoder},
5 error::{DecodeError, EncodeError},
6 protocol::commands::COMMAND_CONSUMER_UPDATE,
7};
8
9use super::Command;
10
11#[cfg_attr(test, derive(fake::Dummy))]
12#[derive(PartialEq, Eq, Debug)]
13pub struct ConsumerUpdateCommand {
14 pub(crate) correlation_id: u32,
15 subscription_id: u8,
16 active: u8,
17}
18
19impl ConsumerUpdateCommand {
20 pub fn new(correlation_id: u32, subscription_id: u8, active: u8) -> Self {
21 Self {
22 correlation_id,
23 subscription_id,
24 active,
25 }
26 }
27
28 pub fn get_correlation_id(&self) -> u32 {
29 self.correlation_id
30 }
31
32 pub fn is_active(&self) -> u8 {
33 self.active
34 }
35}
36
37impl Encoder for ConsumerUpdateCommand {
38 fn encoded_size(&self) -> u32 {
39 self.correlation_id.encoded_size()
40 + self.subscription_id.encoded_size()
41 + self.active.encoded_size()
42 }
43
44 fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
45 self.correlation_id.encode(writer)?;
46 self.subscription_id.encode(writer)?;
47 self.active.encode(writer)?;
48 Ok(())
49 }
50}
51
52impl Decoder for ConsumerUpdateCommand {
53 fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
54 let (input, correlation_id) = u32::decode(input)?;
55 let (input, subscription_id) = u8::decode(input)?;
56 let (input, active) = u8::decode(input)?;
57
58 Ok((
59 input,
60 ConsumerUpdateCommand {
61 correlation_id,
62 subscription_id,
63 active,
64 },
65 ))
66 }
67}
68
69impl Command for ConsumerUpdateCommand {
70 fn key(&self) -> u16 {
71 COMMAND_CONSUMER_UPDATE
72 }
73}
74
75#[cfg(test)]
76mod tests {
77 use crate::commands::tests::command_encode_decode_test;
78
79 use super::ConsumerUpdateCommand;
80
81 #[test]
82 fn consumer_update_response_test() {
83 command_encode_decode_test::<ConsumerUpdateCommand>();
84 }
85}