rabbitmq_stream_protocol/commands/
deliver.rs

1use std::io::Write;
2
3use super::Command;
4use crate::codec::decoder::read_vec;
5use crate::message::Message;
6use crate::{
7    codec::{Decoder, Encoder},
8    error::{DecodeError, EncodeError},
9    protocol::commands::COMMAND_DELIVER,
10};
11use byteorder::{BigEndian, WriteBytesExt};
12
13#[cfg_attr(test, derive(fake::Dummy))]
14#[derive(PartialEq, Eq, Debug, Clone)]
15pub struct DeliverCommand {
16    pub subscription_id: u8,
17    magic_version: i8,
18    chunk_type: u8,
19    num_entries: u16,
20    timestamp: u64,
21    epoch: u64,
22    pub chunk_first_offset: u64,
23    chunk_crc: i32,
24    trailer_length: u32,
25    reserved: u32,
26    pub messages: Vec<Message>,
27}
28
29#[allow(clippy::too_many_arguments)]
30impl DeliverCommand {
31    pub fn new(
32        subscription_id: u8,
33        magic_version: i8,
34        chunk_type: u8,
35        num_entries: u16,
36        timestamp: u64,
37        epoch: u64,
38        chunk_first_offset: u64,
39        chunk_crc: i32,
40        trailer_length: u32,
41        reserved: u32,
42        messages: Vec<Message>,
43    ) -> Self {
44        Self {
45            subscription_id,
46            magic_version,
47            chunk_type,
48            num_entries,
49            timestamp,
50            epoch,
51            chunk_first_offset,
52            chunk_crc,
53            trailer_length,
54            reserved,
55            messages,
56        }
57    }
58}
59
60impl Encoder for DeliverCommand {
61    fn encoded_size(&self) -> u32 {
62        self.subscription_id.encoded_size()
63            + self.magic_version.encoded_size()
64            + self.chunk_type.encoded_size()
65            + self.num_entries.encoded_size()
66            + 4 // num records
67            + self.timestamp.encoded_size()
68            + self.epoch.encoded_size()
69            + self.chunk_first_offset.encoded_size()
70            + self.chunk_crc.encoded_size()
71            + self.trailer_length.encoded_size()
72            + self.reserved.encoded_size()
73            + 4 // vec of messages
74            + self.messages.iter().fold(0, |acc, message| {
75                acc + 1 +  message.encoded_size()
76            })
77    }
78
79    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
80        self.subscription_id.encode(writer)?;
81        self.magic_version.encode(writer)?;
82        self.chunk_type.encode(writer)?;
83        self.num_entries.encode(writer)?;
84        u32::encode(&(self.messages.len() as u32), writer)?;
85        self.timestamp.encode(writer)?;
86        self.epoch.encode(writer)?;
87        self.chunk_first_offset.encode(writer)?;
88        self.chunk_crc.encode(writer)?;
89
90        let size = self
91            .messages
92            .iter()
93            .fold(0, |acc, message| acc + 1 + message.encoded_size());
94
95        writer.write_u32::<BigEndian>(size)?;
96        self.trailer_length.encode(writer)?;
97        self.reserved.encode(writer)?;
98
99        for message in &self.messages {
100            message.encoded_size().encode(writer)?;
101            message.encode(writer)?;
102        }
103        Ok(())
104    }
105}
106
107impl Decoder for DeliverCommand {
108    fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
109        let (input, subscription_id) = u8::decode(input)?;
110        let (input, magic_version) = i8::decode(input)?;
111        let (input, chunk_type) = u8::decode(input)?;
112        let (input, num_entries) = u16::decode(input)?;
113        let (input, num_records) = u32::decode(input)?;
114        let (input, timestamp) = u64::decode(input)?;
115        let (input, epoch) = u64::decode(input)?;
116        let (input, chunk_first_offset) = u64::decode(input)?;
117        let (input, chunk_crc) = i32::decode(input)?;
118        let (input, _data_length) = u32::decode(input)?;
119        let (input, trailer_length) = u32::decode(input)?;
120        let (mut input, reserved) = u32::decode(input)?;
121
122        let mut messages = Vec::with_capacity(num_records as usize);
123        for _ in 0..num_records {
124            let (input1, result) = read_vec(input)?;
125            let (_, message) = Message::decode(&result)?;
126            messages.push(message);
127            input = input1;
128        }
129
130        Ok((
131            input,
132            DeliverCommand {
133                subscription_id,
134                magic_version,
135                chunk_type,
136                num_entries,
137                timestamp,
138                epoch,
139                chunk_first_offset,
140                chunk_crc,
141                trailer_length,
142                reserved,
143                messages,
144            },
145        ))
146    }
147}
148
149impl Command for DeliverCommand {
150    fn key(&self) -> u16 {
151        COMMAND_DELIVER
152    }
153}
154
155#[cfg(test)]
156mod tests {
157
158    use fake::{Dummy, Faker};
159
160    use crate::{commands::tests::command_encode_decode_test, message::InternalMessage};
161
162    use super::{DeliverCommand, Message};
163    impl Dummy<Faker> for Message {
164        fn dummy_with_rng<R: fake::rand::Rng + ?Sized>(_config: &Faker, _rng: &mut R) -> Self {
165            Message::new(InternalMessage::default())
166        }
167    }
168
169    #[test]
170    fn deliver_request_test() {
171        command_encode_decode_test::<DeliverCommand>();
172    }
173}