rabbitmq_stream_protocol/message/amqp/types/primitives/
list.rs

1use crate::{
2    codec::decoder::{read_u32, read_u8},
3    message::amqp::{
4        codec::constants::TypeCode, AmqpDecodeError, AmqpDecoder, AmqpEncodeError, AmqpEncoder,
5    },
6    utils::TupleMapperSecond,
7};
8
9#[derive(Debug, PartialEq, Eq, Clone, Hash, Default)]
10pub struct List(pub Vec<Value>);
11
12impl List {
13    pub fn new() -> Self {
14        Self(vec![])
15    }
16    pub fn push(&mut self, elem: impl Into<Value>) {
17        self.0.push(elem.into())
18    }
19
20    fn content_size(&self) -> u32 {
21        self.0.iter().fold(0, |acc, item| acc + item.encoded_size())
22    }
23}
24
25impl AmqpEncoder for List {
26    fn encoded_size(&self) -> u32 {
27        let content_size = self.content_size();
28
29        let header = if content_size + 1 > u8::MAX as u32 {
30            9
31        } else {
32            3
33        };
34
35        header + content_size
36    }
37
38    fn encode(&self, writer: &mut impl std::io::Write) -> Result<(), AmqpEncodeError> {
39        let content_size = self.content_size();
40
41        if content_size + 1 > u8::MAX as u32 {
42            TypeCode::List8.encode(writer)?;
43            writer.write_u32::<BigEndian>(content_size + 4)?;
44            writer.write_u32::<BigEndian>(self.0.len() as u32)?;
45        } else {
46            TypeCode::List8.encode(writer)?;
47            writer.write_u8((content_size + 1) as u8)?;
48            writer.write_u8(self.0.len() as u8)?;
49        }
50
51        for item in &self.0 {
52            item.encode(writer)?;
53        }
54        Ok(())
55    }
56}
57
58impl AmqpDecoder for List {
59    fn decode(input: &[u8]) -> Result<(&[u8], Self), AmqpDecodeError> {
60        let (input, code) = TypeCode::decode(input)?;
61
62        let (mut input, count) = match code {
63            TypeCode::List0 => return Ok((input, List::new())),
64            TypeCode::List8 => {
65                let (input, _) = read_u8(input)?;
66                read_u8(input)
67                    .map_second(u32::from)
68                    .map_err(AmqpDecodeError::from)?
69            }
70            TypeCode::List32 => {
71                let (input, _) = read_u32(input)?;
72                read_u32(input).map_err(AmqpDecodeError::from)?
73            }
74            _ => return Err(Self::invalid_type_code(code)),
75        };
76
77        let mut list = List::new();
78
79        for _ in 0..count {
80            let (input_inner, elem) = Value::decode(input)?;
81            list.0.push(elem);
82            input = input_inner;
83        }
84
85        Ok((input, list))
86    }
87}
88
89impl List {
90    pub fn decode_with_fields<'a, T, F>(
91        input: &'a [u8],
92        f: F,
93        mut dest: T,
94    ) -> Result<(&'a [u8], T), AmqpDecodeError>
95    where
96        F: Fn(&'a [u8], u32, &mut T) -> Result<&'a [u8], AmqpDecodeError>,
97    {
98        let (input, code) = TypeCode::decode(input)?;
99
100        let (mut input, count) = match code {
101            TypeCode::List0 => return Ok((input, dest)),
102            TypeCode::List8 => {
103                let (input, _) = read_u8(input)?;
104                read_u8(input).map_second(u32::from)?
105            }
106            TypeCode::List32 => {
107                let (input, _) = read_u32(input)?;
108                read_u32(input)?
109            }
110
111            _ => {
112                return Err(AmqpDecodeError::MessageParse(format!(
113                    "Invalid type code {:?} for list",
114                    code
115                )))
116            }
117        };
118
119        for idx in 0..count {
120            let input1 = f(input, idx, &mut dest)?;
121            input = input1;
122        }
123        Ok((input, dest))
124    }
125}
126#[macro_export(local_inner_macros)]
127macro_rules! list_decoder {
128    ($ty:ty, $fname:ident, { $($key:expr => $b:tt),* }) => {
129        fn $fname<'a>(
130            input: &'a [u8],
131            idx: u32,
132            dest: &mut $ty,
133        ) -> Result<&'a [u8], AmqpDecodeError> {
134
135            match idx {
136                 $($key => {
137                      list_decoder!(@internal dest, input, $b)
138                 })*
139                _ => {
140                    Ok(input)
141                }
142            }
143        }
144    };
145    (@internal $dest:ident, $input:ident, { $name:ident, $dt:ident, $def:expr }) => {
146         {
147             let (input, value) = Option::<$dt>::decode($input)?;
148             $dest.$name = value.unwrap_or($def);
149             Ok(input)
150         }
151    };
152    (@internal $dest:ident, $input:ident, { $name:ident, $dt:ident, $def:expr, $optional:expr }) => {
153        {
154            let (input, value) = Option::<$dt>::decode($input)?;
155            $dest.$name = value;
156            Ok(input)
157        }
158    }
159}
160
161use byteorder::{BigEndian, WriteBytesExt};
162pub use list_decoder;
163
164use super::Value;
165
166#[cfg(test)]
167mod tests {
168    use std::ops::Range;
169
170    use fake::{Dummy, Fake, Faker};
171
172    use crate::message::amqp::{
173        tests::type_encode_decode_test_fuzzy,
174        types::{SimpleValue, Value},
175    };
176
177    use super::List;
178    const DEFAULT_LEN_RANGE: Range<usize> = 0..10;
179
180    impl Dummy<Faker> for List {
181        fn dummy_with_rng<R: fake::rand::Rng + ?Sized>(config: &Faker, rng: &mut R) -> Self {
182            let len: usize = DEFAULT_LEN_RANGE.fake_with_rng(rng);
183            let mut m = List::new();
184
185            for _ in 0..len {
186                let simple: SimpleValue = config.fake_with_rng(rng);
187
188                m.0.push(Value::Simple(simple));
189            }
190            m
191        }
192    }
193
194    #[test]
195    fn list_encode_decode_test() {
196        type_encode_decode_test_fuzzy::<List>()
197    }
198}