rabbitmq_stream_protocol/commands/
store_offset.rs

1use crate::{
2    codec::{Decoder, Encoder},
3    error::{DecodeError, EncodeError},
4    protocol::commands::COMMAND_STORE_OFFSET,
5};
6use std::io::Write;
7
8use super::Command;
9
10#[cfg_attr(test, derive(fake::Dummy))]
11#[derive(PartialEq, Eq, Debug)]
12pub struct StoreOffset {
13    reference: String,
14    stream: String,
15    offset: u64,
16}
17
18impl StoreOffset {
19    pub fn new(reference: String, stream: String, offset: u64) -> Self {
20        Self {
21            reference,
22            stream,
23            offset,
24        }
25    }
26}
27
28impl Encoder for StoreOffset {
29    fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
30        let size = self.reference.len();
31        if size >= 256 {
32            return Err(EncodeError::MaxSizeError(size));
33        }
34
35        self.reference.as_str().encode(writer)?;
36        self.stream.as_str().encode(writer)?;
37        self.offset.encode(writer)?;
38        Ok(())
39    }
40
41    fn encoded_size(&self) -> u32 {
42        self.reference.as_str().encoded_size()
43            + self.stream.as_str().encoded_size()
44            + self.offset.encoded_size()
45    }
46}
47
48impl Command for StoreOffset {
49    fn key(&self) -> u16 {
50        COMMAND_STORE_OFFSET
51    }
52}
53impl Decoder for StoreOffset {
54    fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
55        let (input, opt_reference) = Option::decode(input)?;
56        let (input, opt_stream) = Option::decode(input)?;
57        if let (Some(reference), Some(stream)) = (opt_reference, opt_stream) {
58            match reference.len() {
59                0..=255 => {
60                    let (input, offset) = u64::decode(input)?;
61
62                    return Ok((
63                        input,
64                        StoreOffset {
65                            reference,
66                            stream,
67                            offset,
68                        },
69                    ));
70                }
71                size => return Err(DecodeError::MismatchSize(size)),
72            }
73        }
74
75        Err(DecodeError::Empty)
76    }
77}
78
79#[cfg(test)]
80mod tests {
81    use super::StoreOffset;
82    use crate::commands::tests::command_encode_decode_test;
83
84    #[test]
85    fn open_response_test() {
86        command_encode_decode_test::<StoreOffset>();
87    }
88}