rabbitmq_stream_protocol/commands/
store_offset.rs1use crate::{
2 codec::{Decoder, Encoder},
3 error::{DecodeError, EncodeError},
4 protocol::commands::COMMAND_STORE_OFFSET,
5};
6use std::io::Write;
7
8use super::Command;
9
10#[cfg_attr(test, derive(fake::Dummy))]
11#[derive(PartialEq, Eq, Debug)]
12pub struct StoreOffset {
13 reference: String,
14 stream: String,
15 offset: u64,
16}
17
18impl StoreOffset {
19 pub fn new(reference: String, stream: String, offset: u64) -> Self {
20 Self {
21 reference,
22 stream,
23 offset,
24 }
25 }
26}
27
28impl Encoder for StoreOffset {
29 fn encode(&self, writer: &mut impl Write) -> Result<(), EncodeError> {
30 let size = self.reference.len();
31 if size >= 256 {
32 return Err(EncodeError::MaxSizeError(size));
33 }
34
35 self.reference.as_str().encode(writer)?;
36 self.stream.as_str().encode(writer)?;
37 self.offset.encode(writer)?;
38 Ok(())
39 }
40
41 fn encoded_size(&self) -> u32 {
42 self.reference.as_str().encoded_size()
43 + self.stream.as_str().encoded_size()
44 + self.offset.encoded_size()
45 }
46}
47
48impl Command for StoreOffset {
49 fn key(&self) -> u16 {
50 COMMAND_STORE_OFFSET
51 }
52}
53impl Decoder for StoreOffset {
54 fn decode(input: &[u8]) -> Result<(&[u8], Self), DecodeError> {
55 let (input, opt_reference) = Option::decode(input)?;
56 let (input, opt_stream) = Option::decode(input)?;
57 if let (Some(reference), Some(stream)) = (opt_reference, opt_stream) {
58 match reference.len() {
59 0..=255 => {
60 let (input, offset) = u64::decode(input)?;
61
62 return Ok((
63 input,
64 StoreOffset {
65 reference,
66 stream,
67 offset,
68 },
69 ));
70 }
71 size => return Err(DecodeError::MismatchSize(size)),
72 }
73 }
74
75 Err(DecodeError::Empty)
76 }
77}
78
79#[cfg(test)]
80mod tests {
81 use super::StoreOffset;
82 use crate::commands::tests::command_encode_decode_test;
83
84 #[test]
85 fn open_response_test() {
86 command_encode_decode_test::<StoreOffset>();
87 }
88}