rabbitmq_stream_protocol/commands/
consumer_update_request.rs1use std::io::Write;
2
3#[cfg(test)]
4use fake::Fake;
5
6use crate::{
7 codec::{Decoder, Encoder},
8 error::{DecodeError, EncodeError},
9 protocol::commands::COMMAND_CONSUMER_UPDATE_REQUEST,
10};
11
12use crate::commands::subscribe::OffsetSpecification;
13
14use super::Command;
15
16#[cfg_attr(test, derive(fake::Dummy))]
17#[derive(PartialEq, Eq, Debug)]
18pub struct ConsumerUpdateRequestCommand {
19 pub(crate) correlation_id: u32,
20 response_code: u16,
21 offset_specification: OffsetSpecification,
22}
23
24impl ConsumerUpdateRequestCommand {
25 pub fn new(
26 correlation_id: u32,
27 response_code: u16,
28 offset_specification: OffsetSpecification,
29 ) -> Self {
30 Self {
31 correlation_id,
32 response_code,
33 offset_specification,
34 }
35 }
36}
37
38impl Encoder for ConsumerUpdateRequestCommand {
39 fn encoded_size(&self) -> u32 {
40 self.correlation_id.encoded_size()
41 + self.response_code.encoded_size()
42 + self.offset_specification.encoded_size()
43 }
44
45 fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
46 self.correlation_id.encode(writer)?;
47 self.response_code.encode(writer)?;
48 self.offset_specification.encode(writer)?;
49 Ok(())
50 }
51}
52
53impl Decoder for ConsumerUpdateRequestCommand {
54 fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
55 let (input, correlation_id) = u32::decode(input)?;
56 let (input, response_code) = u16::decode(input)?;
57 let (input, offset_specification) = OffsetSpecification::decode(input)?;
58
59 Ok((
60 input,
61 ConsumerUpdateRequestCommand {
62 correlation_id,
63 response_code,
64 offset_specification,
65 },
66 ))
67 }
68}
69
70impl Command for ConsumerUpdateRequestCommand {
71 fn key(&self) -> u16 {
72 COMMAND_CONSUMER_UPDATE_REQUEST
73 }
74}
75
76#[cfg(test)]
77mod tests {
78 use crate::commands::tests::command_encode_decode_test;
79
80 use super::ConsumerUpdateRequestCommand;
81
82 #[test]
83 fn consumer_update_request_test() {
84 command_encode_decode_test::<ConsumerUpdateRequestCommand>();
85 }
86}