rabbitmq_stream_protocol/commands/
subscribe.rs

1use std::collections::HashMap;
2use std::io::Write;
3
4use crate::{
5    codec::{Decoder, Encoder},
6    error::{DecodeError, EncodeError},
7    protocol::commands::COMMAND_SUBSCRIBE,
8};
9
10use super::Command;
11
12#[cfg_attr(test, derive(fake::Dummy))]
13#[derive(PartialEq, Eq, Debug)]
14pub struct SubscribeCommand {
15    correlation_id: u32,
16    subscription_id: u8,
17    stream_name: String,
18    offset_specification: OffsetSpecification,
19    credit: u16,
20    properties: HashMap<String, String>,
21}
22
23impl SubscribeCommand {
24    pub fn new(
25        correlation_id: u32,
26        subscription_id: u8,
27        stream_name: String,
28        offset_specification: OffsetSpecification,
29        credit: u16,
30        properties: HashMap<String, String>,
31    ) -> Self {
32        Self {
33            correlation_id,
34            subscription_id,
35            stream_name,
36            offset_specification,
37            credit,
38            properties,
39        }
40    }
41}
42
43impl Encoder for SubscribeCommand {
44    fn encoded_size(&self) -> u32 {
45        self.correlation_id.encoded_size()
46            + self.subscription_id.encoded_size()
47            + self.stream_name.as_str().encoded_size()
48            + self.offset_specification.encoded_size()
49            + self.credit.encoded_size()
50            + self.properties.encoded_size()
51    }
52
53    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
54        self.correlation_id.encode(writer)?;
55        self.subscription_id.encode(writer)?;
56        self.stream_name.as_str().encode(writer)?;
57        self.offset_specification.encode(writer)?;
58        self.credit.encode(writer)?;
59        self.properties.encode(writer)?;
60        Ok(())
61    }
62}
63
64impl Decoder for SubscribeCommand {
65    fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
66        let (input, correlation_id) = u32::decode(input)?;
67        let (input, subscription_id) = u8::decode(input)?;
68        let (input, stream_name) = Option::decode(input)?;
69        let (input, offset_specification) = OffsetSpecification::decode(input)?;
70        let (input, credit) = u16::decode(input)?;
71        let (input, properties) = HashMap::decode(input)?;
72
73        Ok((
74            input,
75            SubscribeCommand {
76                correlation_id,
77                subscription_id,
78                stream_name: stream_name.unwrap(),
79                offset_specification,
80                credit,
81                properties,
82            },
83        ))
84    }
85}
86
87impl Command for SubscribeCommand {
88    fn key(&self) -> u16 {
89        COMMAND_SUBSCRIBE
90    }
91}
92
93#[cfg_attr(test, derive(fake::Dummy))]
94#[derive(PartialEq, Eq, Debug, Clone)]
95pub enum OffsetSpecification {
96    First,
97    Last,
98    Next,
99    Offset(u64),
100    Timestamp(i64),
101}
102
103impl OffsetSpecification {
104    fn get_type(&self) -> u16 {
105        match self {
106            OffsetSpecification::First => 1,
107            OffsetSpecification::Last => 2,
108            OffsetSpecification::Next => 3,
109            OffsetSpecification::Offset(_) => 4,
110            OffsetSpecification::Timestamp(_) => 5,
111        }
112    }
113}
114
115impl Encoder for OffsetSpecification {
116    fn encoded_size(&self) -> u32 {
117        self.get_type().encoded_size()
118            + match self {
119                OffsetSpecification::Offset(offset) => offset.encoded_size(),
120                OffsetSpecification::Timestamp(timestamp) => timestamp.encoded_size(),
121                _ => 0,
122            }
123    }
124
125    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
126        self.get_type().encode(writer)?;
127        match self {
128            OffsetSpecification::Offset(offset) => offset.encode(writer),
129            OffsetSpecification::Timestamp(timestamp) => timestamp.encode(writer),
130            _ => Ok(()),
131        }
132    }
133}
134
135impl Decoder for OffsetSpecification {
136    fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
137        let (input, offset_type) = u16::decode(input)?;
138
139        match offset_type {
140            1 => Ok((input, OffsetSpecification::First)),
141            2 => Ok((input, OffsetSpecification::Last)),
142            3 => Ok((input, OffsetSpecification::Next)),
143            4 => {
144                let (input, offset) = u64::decode(input)?;
145                Ok((input, OffsetSpecification::Offset(offset)))
146            }
147            5 => {
148                let (input, timestamp) = i64::decode(input)?;
149                Ok((input, OffsetSpecification::Timestamp(timestamp)))
150            }
151            _ => panic!("Offset type not supported"),
152        }
153    }
154}
155
156#[cfg(test)]
157mod tests {
158    use crate::commands::tests::command_encode_decode_test;
159
160    use super::SubscribeCommand;
161
162    #[test]
163    fn subscribe_request_test() {
164        command_encode_decode_test::<SubscribeCommand>();
165    }
166}