rabbitmq_stream_protocol/commands/
subscribe.rs1use std::collections::HashMap;
2use std::io::Write;
3
4use crate::{
5 codec::{Decoder, Encoder},
6 error::{DecodeError, EncodeError},
7 protocol::commands::COMMAND_SUBSCRIBE,
8};
9
10use super::Command;
11
12#[cfg_attr(test, derive(fake::Dummy))]
13#[derive(PartialEq, Eq, Debug)]
14pub struct SubscribeCommand {
15 correlation_id: u32,
16 subscription_id: u8,
17 stream_name: String,
18 offset_specification: OffsetSpecification,
19 credit: u16,
20 properties: HashMap<String, String>,
21}
22
23impl SubscribeCommand {
24 pub fn new(
25 correlation_id: u32,
26 subscription_id: u8,
27 stream_name: String,
28 offset_specification: OffsetSpecification,
29 credit: u16,
30 properties: HashMap<String, String>,
31 ) -> Self {
32 Self {
33 correlation_id,
34 subscription_id,
35 stream_name,
36 offset_specification,
37 credit,
38 properties,
39 }
40 }
41}
42
43impl Encoder for SubscribeCommand {
44 fn encoded_size(&self) -> u32 {
45 self.correlation_id.encoded_size()
46 + self.subscription_id.encoded_size()
47 + self.stream_name.as_str().encoded_size()
48 + self.offset_specification.encoded_size()
49 + self.credit.encoded_size()
50 + self.properties.encoded_size()
51 }
52
53 fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
54 self.correlation_id.encode(writer)?;
55 self.subscription_id.encode(writer)?;
56 self.stream_name.as_str().encode(writer)?;
57 self.offset_specification.encode(writer)?;
58 self.credit.encode(writer)?;
59 self.properties.encode(writer)?;
60 Ok(())
61 }
62}
63
64impl Decoder for SubscribeCommand {
65 fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
66 let (input, correlation_id) = u32::decode(input)?;
67 let (input, subscription_id) = u8::decode(input)?;
68 let (input, stream_name) = Option::decode(input)?;
69 let (input, offset_specification) = OffsetSpecification::decode(input)?;
70 let (input, credit) = u16::decode(input)?;
71 let (input, properties) = HashMap::decode(input)?;
72
73 Ok((
74 input,
75 SubscribeCommand {
76 correlation_id,
77 subscription_id,
78 stream_name: stream_name.unwrap(),
79 offset_specification,
80 credit,
81 properties,
82 },
83 ))
84 }
85}
86
87impl Command for SubscribeCommand {
88 fn key(&self) -> u16 {
89 COMMAND_SUBSCRIBE
90 }
91}
92
93#[cfg_attr(test, derive(fake::Dummy))]
94#[derive(PartialEq, Eq, Debug, Clone)]
95pub enum OffsetSpecification {
96 First,
97 Last,
98 Next,
99 Offset(u64),
100 Timestamp(i64),
101}
102
103impl OffsetSpecification {
104 fn get_type(&self) -> u16 {
105 match self {
106 OffsetSpecification::First => 1,
107 OffsetSpecification::Last => 2,
108 OffsetSpecification::Next => 3,
109 OffsetSpecification::Offset(_) => 4,
110 OffsetSpecification::Timestamp(_) => 5,
111 }
112 }
113}
114
115impl Encoder for OffsetSpecification {
116 fn encoded_size(&self) -> u32 {
117 self.get_type().encoded_size()
118 + match self {
119 OffsetSpecification::Offset(offset) => offset.encoded_size(),
120 OffsetSpecification::Timestamp(timestamp) => timestamp.encoded_size(),
121 _ => 0,
122 }
123 }
124
125 fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
126 self.get_type().encode(writer)?;
127 match self {
128 OffsetSpecification::Offset(offset) => offset.encode(writer),
129 OffsetSpecification::Timestamp(timestamp) => timestamp.encode(writer),
130 _ => Ok(()),
131 }
132 }
133}
134
135impl Decoder for OffsetSpecification {
136 fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
137 let (input, offset_type) = u16::decode(input)?;
138
139 match offset_type {
140 1 => Ok((input, OffsetSpecification::First)),
141 2 => Ok((input, OffsetSpecification::Last)),
142 3 => Ok((input, OffsetSpecification::Next)),
143 4 => {
144 let (input, offset) = u64::decode(input)?;
145 Ok((input, OffsetSpecification::Offset(offset)))
146 }
147 5 => {
148 let (input, timestamp) = i64::decode(input)?;
149 Ok((input, OffsetSpecification::Timestamp(timestamp)))
150 }
151 _ => panic!("Offset type not supported"),
152 }
153 }
154}
155
156#[cfg(test)]
157mod tests {
158 use crate::commands::tests::command_encode_decode_test;
159
160 use super::SubscribeCommand;
161
162 #[test]
163 fn subscribe_request_test() {
164 command_encode_decode_test::<SubscribeCommand>();
165 }
166}