rabbitmq_stream_protocol/commands/
publish.rs1use std::io::Write;
2
3use crate::{
4 codec::{Decoder, Encoder},
5 error::{DecodeError, EncodeError},
6 protocol::commands::COMMAND_PUBLISH,
7 protocol::version::PROTOCOL_VERSION_2,
8};
9
10use super::Command;
11
12use crate::types::PublishedMessage;
13
14#[cfg_attr(test, derive(fake::Dummy))]
15#[derive(PartialEq, Eq, Debug)]
16pub struct PublishCommand {
17 publisher_id: u8,
18 published_messages: Vec<PublishedMessage>,
19 #[cfg_attr(test, dummy(faker = "1"))]
20 version: u16,
21}
22
23impl PublishCommand {
24 pub fn new(publisher_id: u8, published_messages: Vec<PublishedMessage>, version: u16) -> Self {
25 Self {
26 publisher_id,
27 published_messages,
28 version,
29 }
30 }
31}
32
33impl Encoder for PublishCommand {
34 fn encoded_size(&self) -> u32 {
35 if self.version == PROTOCOL_VERSION_2 {
36 self.publisher_id.encoded_size() + self.published_messages.encoded_size_version_2()
37 } else {
38 self.publisher_id.encoded_size() + self.published_messages.encoded_size()
39 }
40 }
41
42 fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
43 self.publisher_id.encode(writer)?;
44 if self.version == PROTOCOL_VERSION_2 {
45 self.published_messages.encode_version_2(writer)?;
46 } else {
47 self.published_messages.encode(writer)?;
48 }
49 Ok(())
50 }
51}
52
53impl Command for PublishCommand {
54 fn key(&self) -> u16 {
55 COMMAND_PUBLISH
56 }
57
58 fn version(&self) -> u16 {
59 self.version
60 }
61}
62
63impl Decoder for PublishCommand {
64 fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
65 let (input, publisher_id) = u8::decode(input)?;
66 let (input, published_messages) = Vec::decode(input)?;
67
68 Ok((
69 input,
70 PublishCommand {
71 publisher_id,
72 published_messages,
73 version: 1,
74 },
75 ))
76 }
77}
78
79#[cfg(test)]
80mod tests {
81
82 use crate::commands::tests::command_encode_decode_test;
83
84 use super::PublishCommand;
85
86 #[test]
87 fn publish_request_test() {
88 command_encode_decode_test::<PublishCommand>();
89 }
90}