rabbitmq_stream_protocol/commands/
deliver.rs1use std::io::Write;
2
3use super::Command;
4use crate::codec::decoder::read_vec;
5use crate::message::Message;
6use crate::{
7 codec::{Decoder, Encoder},
8 error::{DecodeError, EncodeError},
9 protocol::commands::COMMAND_DELIVER,
10};
11use byteorder::{BigEndian, WriteBytesExt};
12
13#[cfg_attr(test, derive(fake::Dummy))]
14#[derive(PartialEq, Eq, Debug, Clone)]
15pub struct DeliverCommand {
16 pub subscription_id: u8,
17 magic_version: i8,
18 chunk_type: u8,
19 num_entries: u16,
20 timestamp: u64,
21 epoch: u64,
22 pub chunk_first_offset: u64,
23 chunk_crc: i32,
24 trailer_length: u32,
25 reserved: u32,
26 pub messages: Vec<Message>,
27}
28
29#[allow(clippy::too_many_arguments)]
30impl DeliverCommand {
31 pub fn new(
32 subscription_id: u8,
33 magic_version: i8,
34 chunk_type: u8,
35 num_entries: u16,
36 timestamp: u64,
37 epoch: u64,
38 chunk_first_offset: u64,
39 chunk_crc: i32,
40 trailer_length: u32,
41 reserved: u32,
42 messages: Vec<Message>,
43 ) -> Self {
44 Self {
45 subscription_id,
46 magic_version,
47 chunk_type,
48 num_entries,
49 timestamp,
50 epoch,
51 chunk_first_offset,
52 chunk_crc,
53 trailer_length,
54 reserved,
55 messages,
56 }
57 }
58}
59
60impl Encoder for DeliverCommand {
61 fn encoded_size(&self) -> u32 {
62 self.subscription_id.encoded_size()
63 + self.magic_version.encoded_size()
64 + self.chunk_type.encoded_size()
65 + self.num_entries.encoded_size()
66 + 4 + self.timestamp.encoded_size()
68 + self.epoch.encoded_size()
69 + self.chunk_first_offset.encoded_size()
70 + self.chunk_crc.encoded_size()
71 + self.trailer_length.encoded_size()
72 + self.reserved.encoded_size()
73 + 4 + self.messages.iter().fold(0, |acc, message| {
75 acc + 1 + message.encoded_size()
76 })
77 }
78
79 fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
80 self.subscription_id.encode(writer)?;
81 self.magic_version.encode(writer)?;
82 self.chunk_type.encode(writer)?;
83 self.num_entries.encode(writer)?;
84 u32::encode(&(self.messages.len() as u32), writer)?;
85 self.timestamp.encode(writer)?;
86 self.epoch.encode(writer)?;
87 self.chunk_first_offset.encode(writer)?;
88 self.chunk_crc.encode(writer)?;
89
90 let size = self
91 .messages
92 .iter()
93 .fold(0, |acc, message| acc + 1 + message.encoded_size());
94
95 writer.write_u32::<BigEndian>(size)?;
96 self.trailer_length.encode(writer)?;
97 self.reserved.encode(writer)?;
98
99 for message in &self.messages {
100 message.encoded_size().encode(writer)?;
101 message.encode(writer)?;
102 }
103 Ok(())
104 }
105}
106
107impl Decoder for DeliverCommand {
108 fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
109 let (input, subscription_id) = u8::decode(input)?;
110 let (input, magic_version) = i8::decode(input)?;
111 let (input, chunk_type) = u8::decode(input)?;
112 let (input, num_entries) = u16::decode(input)?;
113 let (input, num_records) = u32::decode(input)?;
114 let (input, timestamp) = u64::decode(input)?;
115 let (input, epoch) = u64::decode(input)?;
116 let (input, chunk_first_offset) = u64::decode(input)?;
117 let (input, chunk_crc) = i32::decode(input)?;
118 let (input, _data_length) = u32::decode(input)?;
119 let (input, trailer_length) = u32::decode(input)?;
120 let (mut input, reserved) = u32::decode(input)?;
121
122 let mut messages = Vec::with_capacity(num_records as usize);
123 for _ in 0..num_records {
124 let (input1, result) = read_vec(input)?;
125 let (_, message) = Message::decode(&result)?;
126 messages.push(message);
127 input = input1;
128 }
129
130 Ok((
131 input,
132 DeliverCommand {
133 subscription_id,
134 magic_version,
135 chunk_type,
136 num_entries,
137 timestamp,
138 epoch,
139 chunk_first_offset,
140 chunk_crc,
141 trailer_length,
142 reserved,
143 messages,
144 },
145 ))
146 }
147}
148
149impl Command for DeliverCommand {
150 fn key(&self) -> u16 {
151 COMMAND_DELIVER
152 }
153}
154
155#[cfg(test)]
156mod tests {
157
158 use fake::{Dummy, Faker};
159
160 use crate::{commands::tests::command_encode_decode_test, message::InternalMessage};
161
162 use super::{DeliverCommand, Message};
163 impl Dummy<Faker> for Message {
164 fn dummy_with_rng<R: fake::rand::Rng + ?Sized>(_config: &Faker, _rng: &mut R) -> Self {
165 Message::new(InternalMessage::default())
166 }
167 }
168
169 #[test]
170 fn deliver_request_test() {
171 command_encode_decode_test::<DeliverCommand>();
172 }
173}