rabbitmq_stream_protocol/commands/
subscribe.rs1use std::collections::HashMap;
2use std::io::Write;
3
4#[cfg(test)]
5use fake::Fake;
6
7use crate::{
8 codec::{Decoder, Encoder},
9 error::{DecodeError, EncodeError},
10 protocol::commands::COMMAND_SUBSCRIBE,
11};
12
13use super::Command;
14
15#[cfg_attr(test, derive(fake::Dummy))]
16#[derive(PartialEq, Eq, Debug)]
17pub struct SubscribeCommand {
18 correlation_id: u32,
19 subscription_id: u8,
20 stream_name: String,
21 offset_specification: OffsetSpecification,
22 credit: u16,
23 properties: HashMap<String, String>,
24}
25
26impl SubscribeCommand {
27 pub fn new(
28 correlation_id: u32,
29 subscription_id: u8,
30 stream_name: String,
31 offset_specification: OffsetSpecification,
32 credit: u16,
33 properties: HashMap<String, String>,
34 ) -> Self {
35 Self {
36 correlation_id,
37 subscription_id,
38 stream_name,
39 offset_specification,
40 credit,
41 properties,
42 }
43 }
44}
45
46impl Encoder for SubscribeCommand {
47 fn encoded_size(&self) -> u32 {
48 self.correlation_id.encoded_size()
49 + self.subscription_id.encoded_size()
50 + self.stream_name.as_str().encoded_size()
51 + self.offset_specification.encoded_size()
52 + self.credit.encoded_size()
53 + self.properties.encoded_size()
54 }
55
56 fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
57 self.correlation_id.encode(writer)?;
58 self.subscription_id.encode(writer)?;
59 self.stream_name.as_str().encode(writer)?;
60 self.offset_specification.encode(writer)?;
61 self.credit.encode(writer)?;
62 self.properties.encode(writer)?;
63 Ok(())
64 }
65}
66
67impl Decoder for SubscribeCommand {
68 fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
69 let (input, correlation_id) = u32::decode(input)?;
70 let (input, subscription_id) = u8::decode(input)?;
71 let (input, stream_name) = Option::decode(input)?;
72 let (input, offset_specification) = OffsetSpecification::decode(input)?;
73 let (input, credit) = u16::decode(input)?;
74 let (input, properties) = HashMap::decode(input)?;
75
76 Ok((
77 input,
78 SubscribeCommand {
79 correlation_id,
80 subscription_id,
81 stream_name: stream_name.unwrap(),
82 offset_specification,
83 credit,
84 properties,
85 },
86 ))
87 }
88}
89
90impl Command for SubscribeCommand {
91 fn key(&self) -> u16 {
92 COMMAND_SUBSCRIBE
93 }
94}
95
96#[cfg_attr(test, derive(fake::Dummy))]
97#[derive(PartialEq, Eq, Debug, Clone)]
98pub enum OffsetSpecification {
99 First,
100 Last,
101 Next,
102 Offset(u64),
103 Timestamp(i64),
104}
105
106impl OffsetSpecification {
107 fn get_type(&self) -> u16 {
108 match self {
109 OffsetSpecification::First => 1,
110 OffsetSpecification::Last => 2,
111 OffsetSpecification::Next => 3,
112 OffsetSpecification::Offset(_) => 4,
113 OffsetSpecification::Timestamp(_) => 5,
114 }
115 }
116}
117
118impl Encoder for OffsetSpecification {
119 fn encoded_size(&self) -> u32 {
120 self.get_type().encoded_size()
121 + match self {
122 OffsetSpecification::Offset(offset) => offset.encoded_size(),
123 OffsetSpecification::Timestamp(timestamp) => timestamp.encoded_size(),
124 _ => 0,
125 }
126 }
127
128 fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
129 self.get_type().encode(writer)?;
130 match self {
131 OffsetSpecification::Offset(offset) => offset.encode(writer),
132 OffsetSpecification::Timestamp(timestamp) => timestamp.encode(writer),
133 _ => Ok(()),
134 }
135 }
136}
137
138impl Decoder for OffsetSpecification {
139 fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
140 let (input, offset_type) = u16::decode(input)?;
141
142 match offset_type {
143 1 => Ok((input, OffsetSpecification::First)),
144 2 => Ok((input, OffsetSpecification::Last)),
145 3 => Ok((input, OffsetSpecification::Next)),
146 4 => {
147 let (input, offset) = u64::decode(input)?;
148 Ok((input, OffsetSpecification::Offset(offset)))
149 }
150 5 => {
151 let (input, timestamp) = i64::decode(input)?;
152 Ok((input, OffsetSpecification::Timestamp(timestamp)))
153 }
154 _ => panic!("Offset type not supported"),
155 }
156 }
157}
158
159#[cfg(test)]
160mod tests {
161 use crate::commands::tests::command_encode_decode_test;
162
163 use super::SubscribeCommand;
164
165 #[test]
166 fn subscribe_request_test() {
167 command_encode_decode_test::<SubscribeCommand>();
168 }
169}