ntex_amqp_codec/types/
array.rs

1use ntex_bytes::{BufMut, Bytes, BytesMut};
2
3use crate::codec::{self, ArrayEncode, ArrayHeader, Decode, DecodeFormatted, Encode};
4use crate::error::AmqpParseError;
5use crate::types::Constructor;
6
7#[derive(Debug, Clone, Hash, Eq, PartialEq)]
8pub struct Array {
9    count: u32,
10    element_constructor: Constructor,
11    payload: Bytes,
12}
13
14impl Array {
15    pub fn new<'a, I, T>(iter: I) -> Array
16    where
17        I: Iterator<Item = &'a T>,
18        T: ArrayEncode + 'a,
19    {
20        let mut len = 0;
21        let mut buf = BytesMut::new();
22        for item in iter {
23            len += 1;
24            item.array_encode(&mut buf);
25        }
26
27        Array {
28            count: len,
29            payload: buf.freeze(),
30            element_constructor: T::ARRAY_CONSTRUCTOR,
31        }
32    }
33
34    pub fn element_constructor(&self) -> &Constructor {
35        &self.element_constructor
36    }
37
38    /// Attempts to decode the array into a vector of type `T`. Format code supplied to T::decode_with_format is the format code of the underlying
39    /// AMQP type of array's element constructor. Use `Array::element_constructor` to access full constructor if needed.
40    pub fn decode<T: DecodeFormatted>(&self) -> Result<Vec<T>, AmqpParseError> {
41        let mut buf = self.payload.clone();
42        let mut result: Vec<T> = Vec::with_capacity(self.count as usize);
43        for _ in 0..self.count {
44            let decoded = T::decode_with_format(&mut buf, self.element_constructor.format_code())?;
45            result.push(decoded);
46        }
47        Ok(result)
48    }
49}
50
51impl<T> From<Vec<T>> for Array
52where
53    T: ArrayEncode,
54{
55    fn from(data: Vec<T>) -> Array {
56        Array::new(data.iter())
57    }
58}
59
60impl Encode for Array {
61    fn encoded_size(&self) -> usize {
62        let ctor_len = self.element_constructor.encoded_size();
63        let header_len = if self.payload.len() + ctor_len + 1 > u8::MAX as usize {
64            9 // 1 for format code, 4 for size, 4 for count
65        } else {
66            3 // 1 for format code, 1 for size, 1 for count
67        };
68
69        header_len + ctor_len + self.payload.len()
70    }
71
72    fn encode(&self, buf: &mut BytesMut) {
73        let ctor_len = self.element_constructor.encoded_size();
74        if self.payload.len() + ctor_len + 1 > u8::MAX as usize {
75            buf.put_u8(codec::FORMATCODE_ARRAY32);
76            buf.put_u32((4 + ctor_len + self.payload.len()) as u32); // size. 4 for count
77            buf.put_u32(self.count);
78        } else {
79            buf.put_u8(codec::FORMATCODE_ARRAY8);
80            buf.put_u8((1 + ctor_len + self.payload.len()) as u8); // size. 1 for count
81            buf.put_u8(self.count as u8);
82        }
83        self.element_constructor.encode(buf);
84        buf.extend_from_slice(self.payload.as_ref());
85    }
86}
87
88impl DecodeFormatted for Array {
89    fn decode_with_format(input: &mut Bytes, fmt: u8) -> Result<Self, AmqpParseError> {
90        let header = ArrayHeader::decode_with_format(input, fmt)?;
91        let size = header.size as usize;
92        decode_check_len!(input, size);
93        let mut payload = input.split_to(size);
94        let element_constructor = Constructor::decode(&mut payload)?;
95
96        Ok(Array {
97            element_constructor,
98            payload,
99            count: header.count,
100        })
101    }
102}