rabbitmq_stream_protocol/commands/
subscribe.rs

1use std::collections::HashMap;
2use std::io::Write;
3
4#[cfg(test)]
5use fake::Fake;
6
7use crate::{
8    codec::{Decoder, Encoder},
9    error::{DecodeError, EncodeError},
10    protocol::commands::COMMAND_SUBSCRIBE,
11};
12
13use super::Command;
14
15#[cfg_attr(test, derive(fake::Dummy))]
16#[derive(PartialEq, Eq, Debug)]
17pub struct SubscribeCommand {
18    correlation_id: u32,
19    subscription_id: u8,
20    stream_name: String,
21    offset_specification: OffsetSpecification,
22    credit: u16,
23    properties: HashMap<String, String>,
24}
25
26impl SubscribeCommand {
27    pub fn new(
28        correlation_id: u32,
29        subscription_id: u8,
30        stream_name: String,
31        offset_specification: OffsetSpecification,
32        credit: u16,
33        properties: HashMap<String, String>,
34    ) -> Self {
35        Self {
36            correlation_id,
37            subscription_id,
38            stream_name,
39            offset_specification,
40            credit,
41            properties,
42        }
43    }
44}
45
46impl Encoder for SubscribeCommand {
47    fn encoded_size(&self) -> u32 {
48        self.correlation_id.encoded_size()
49            + self.subscription_id.encoded_size()
50            + self.stream_name.as_str().encoded_size()
51            + self.offset_specification.encoded_size()
52            + self.credit.encoded_size()
53            + self.properties.encoded_size()
54    }
55
56    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
57        self.correlation_id.encode(writer)?;
58        self.subscription_id.encode(writer)?;
59        self.stream_name.as_str().encode(writer)?;
60        self.offset_specification.encode(writer)?;
61        self.credit.encode(writer)?;
62        self.properties.encode(writer)?;
63        Ok(())
64    }
65}
66
67impl Decoder for SubscribeCommand {
68    fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
69        let (input, correlation_id) = u32::decode(input)?;
70        let (input, subscription_id) = u8::decode(input)?;
71        let (input, stream_name) = Option::decode(input)?;
72        let (input, offset_specification) = OffsetSpecification::decode(input)?;
73        let (input, credit) = u16::decode(input)?;
74        let (input, properties) = HashMap::decode(input)?;
75
76        Ok((
77            input,
78            SubscribeCommand {
79                correlation_id,
80                subscription_id,
81                stream_name: stream_name.unwrap(),
82                offset_specification,
83                credit,
84                properties,
85            },
86        ))
87    }
88}
89
90impl Command for SubscribeCommand {
91    fn key(&self) -> u16 {
92        COMMAND_SUBSCRIBE
93    }
94}
95
96#[cfg_attr(test, derive(fake::Dummy))]
97#[derive(PartialEq, Eq, Debug, Clone)]
98pub enum OffsetSpecification {
99    First,
100    Last,
101    Next,
102    Offset(u64),
103    Timestamp(i64),
104}
105
106impl OffsetSpecification {
107    fn get_type(&self) -> u16 {
108        match self {
109            OffsetSpecification::First => 1,
110            OffsetSpecification::Last => 2,
111            OffsetSpecification::Next => 3,
112            OffsetSpecification::Offset(_) => 4,
113            OffsetSpecification::Timestamp(_) => 5,
114        }
115    }
116}
117
118impl Encoder for OffsetSpecification {
119    fn encoded_size(&self) -> u32 {
120        self.get_type().encoded_size()
121            + match self {
122                OffsetSpecification::Offset(offset) => offset.encoded_size(),
123                OffsetSpecification::Timestamp(timestamp) => timestamp.encoded_size(),
124                _ => 0,
125            }
126    }
127
128    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
129        self.get_type().encode(writer)?;
130        match self {
131            OffsetSpecification::Offset(offset) => offset.encode(writer),
132            OffsetSpecification::Timestamp(timestamp) => timestamp.encode(writer),
133            _ => Ok(()),
134        }
135    }
136}
137
138impl Decoder for OffsetSpecification {
139    fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
140        let (input, offset_type) = u16::decode(input)?;
141
142        match offset_type {
143            1 => Ok((input, OffsetSpecification::First)),
144            2 => Ok((input, OffsetSpecification::Last)),
145            3 => Ok((input, OffsetSpecification::Next)),
146            4 => {
147                let (input, offset) = u64::decode(input)?;
148                Ok((input, OffsetSpecification::Offset(offset)))
149            }
150            5 => {
151                let (input, timestamp) = i64::decode(input)?;
152                Ok((input, OffsetSpecification::Timestamp(timestamp)))
153            }
154            _ => panic!("Offset type not supported"),
155        }
156    }
157}
158
159#[cfg(test)]
160mod tests {
161    use crate::commands::tests::command_encode_decode_test;
162
163    use super::SubscribeCommand;
164
165    #[test]
166    fn subscribe_request_test() {
167        command_encode_decode_test::<SubscribeCommand>();
168    }
169}