ntex_amqp_codec/message/
body.rs

1use ntex_bytes::{BufMut, Bytes, BytesMut};
2
3use crate::codec::{Encode, FORMATCODE_BINARY32, FORMATCODE_BINARY8};
4use crate::protocol::TransferBody;
5use crate::types::{Descriptor, List, Variant};
6
7use super::SECTION_PREFIX_LENGTH;
8
9#[derive(Debug, Clone, Default, PartialEq, Eq)]
10pub struct MessageBody {
11    pub data: Vec<Bytes>,
12    pub sequence: Vec<List>,
13    pub messages: Vec<TransferBody>,
14    pub value: Option<Variant>,
15}
16
17impl MessageBody {
18    pub fn data(&self) -> Option<&Bytes> {
19        if self.data.is_empty() {
20            None
21        } else {
22            Some(&self.data[0])
23        }
24    }
25
26    pub fn value(&self) -> Option<&Variant> {
27        self.value.as_ref()
28    }
29
30    pub fn set_data(&mut self, data: Bytes) {
31        self.data.clear();
32        self.data.push(data);
33    }
34}
35
36impl Encode for MessageBody {
37    fn encoded_size(&self) -> usize {
38        let mut size = self
39            .data
40            .iter()
41            .fold(0, |a, d| a + d.encoded_size() + SECTION_PREFIX_LENGTH);
42        size += self
43            .sequence
44            .iter()
45            .fold(0, |a, seq| a + seq.encoded_size() + SECTION_PREFIX_LENGTH);
46        size += self.messages.iter().fold(0, |a, m| {
47            let length = m.encoded_size();
48            let size = length + if length > u8::MAX as usize { 5 } else { 2 };
49            a + size + SECTION_PREFIX_LENGTH
50        });
51
52        if let Some(ref val) = self.value {
53            size + val.encoded_size() + SECTION_PREFIX_LENGTH
54        } else {
55            size
56        }
57    }
58
59    fn encode(&self, dst: &mut BytesMut) {
60        self.data.iter().for_each(|d| {
61            Descriptor::Ulong(117).encode(dst);
62            d.encode(dst);
63        });
64        self.sequence.iter().for_each(|seq| {
65            Descriptor::Ulong(118).encode(dst);
66            seq.encode(dst)
67        });
68        if let Some(ref val) = self.value {
69            Descriptor::Ulong(119).encode(dst);
70            val.encode(dst);
71        }
72        // encode Message as nested Bytes object
73        self.messages.iter().for_each(|m| {
74            Descriptor::Ulong(117).encode(dst);
75
76            // Bytes prefix
77            let length = m.encoded_size();
78            if length > u8::MAX as usize {
79                dst.put_u8(FORMATCODE_BINARY32);
80                dst.put_u32(length as u32);
81            } else {
82                dst.put_u8(FORMATCODE_BINARY8);
83                dst.put_u8(length as u8);
84            }
85            // encode nested Message
86            m.encode(dst);
87        });
88    }
89}