amq_protocol/frame/
parsing.rs

1/// Traits required for parsing
2pub use crate::types::parsing::traits;
3use crate::{
4    frame::*,
5    protocol::{basic::parse_properties, *},
6    types::parsing::*,
7};
8use nom::{
9    Parser,
10    bytes::streaming::{tag, take},
11    combinator::{all_consuming, flat_map, map, map_opt, map_res},
12    error::context,
13};
14use traits::ParsableInput;
15
16/// Parse a channel id
17pub fn parse_channel<I: ParsableInput>(i: I) -> ParserResult<I, AMQPChannel> {
18    context("parse_channel", map(parse_id, From::from)).parse(i)
19}
20
21/// Parse the protocol header
22pub fn parse_protocol_header<I: ParsableInput>(i: I) -> ParserResult<I, ProtocolVersion> {
23    context(
24        "parse_protocol_header",
25        map(
26            (
27                tag(&metadata::NAME.as_bytes()[1..]),
28                tag(&[0][..]),
29                parse_short_short_uint,
30                parse_short_short_uint,
31                parse_short_short_uint,
32            ),
33            |(_, _, major, minor, revision)| ProtocolVersion {
34                major,
35                minor,
36                revision,
37            },
38        ),
39    )
40    .parse(i)
41}
42
43/// Parse the frame type
44pub fn parse_frame_type<I: ParsableInput>(i: I) -> ParserResult<I, AMQPFrameType> {
45    context(
46        "parse_frame_type",
47        map_opt(parse_short_short_uint, |method| match method {
48            c if c == metadata::NAME.as_bytes()[0] => Some(AMQPFrameType::ProtocolHeader),
49            constants::FRAME_METHOD => Some(AMQPFrameType::Method),
50            constants::FRAME_HEADER => Some(AMQPFrameType::Header),
51            constants::FRAME_BODY => Some(AMQPFrameType::Body),
52            constants::FRAME_HEARTBEAT => Some(AMQPFrameType::Heartbeat),
53            _ => None,
54        }),
55    )
56    .parse(i)
57}
58
59/// Parse a full AMQP Frame (with contents)
60pub fn parse_frame<I: ParsableInput>(i: I) -> ParserResult<I, AMQPFrame> {
61    context(
62        "parse_frame",
63        flat_map(parse_frame_type, move |frame_type| {
64            move |i: I| match frame_type {
65                AMQPFrameType::ProtocolHeader => {
66                    map(parse_protocol_header, AMQPFrame::ProtocolHeader).parse(i)
67                }
68                frame_type => map_res(
69                    parse_raw_frame(frame_type),
70                    |AMQPRawFrame {
71                         channel_id,
72                         frame_type,
73                         payload,
74                     }: AMQPRawFrame<I>| match frame_type {
75                        // This should be unreachable be better have a sensitive value anyways
76                        AMQPFrameType::ProtocolHeader => {
77                            Ok(AMQPFrame::ProtocolHeader(ProtocolVersion::amqp_0_9_1()))
78                        }
79                        AMQPFrameType::Method => all_consuming(parse_class)
80                            .parse(payload)
81                            .map(|(_, m)| AMQPFrame::Method(channel_id, m)),
82                        AMQPFrameType::Header => all_consuming(parse_content_header)
83                            .parse(payload)
84                            .map(|(_, h)| AMQPFrame::Header(channel_id, h.class_id, Box::new(h))),
85                        AMQPFrameType::Body => Ok(AMQPFrame::Body(
86                            channel_id,
87                            payload.iter_elements().collect(),
88                        )),
89                        AMQPFrameType::Heartbeat => Ok(AMQPFrame::Heartbeat(channel_id)),
90                    },
91                )
92                .parse(i),
93            }
94        }),
95    )
96    .parse(i)
97}
98
99/// Parse a raw AMQP frame
100pub fn parse_raw_frame<I: ParsableInput>(
101    frame_type: AMQPFrameType,
102) -> impl FnMut(I) -> ParserResult<I, AMQPRawFrame<I>> {
103    move |i: I| {
104        context(
105            "parse_raw_frame",
106            flat_map((parse_id, parse_long_uint), move |(channel_id, size)| {
107                map(
108                    (take(size), tag(&[constants::FRAME_END][..])),
109                    move |(payload, _)| AMQPRawFrame {
110                        frame_type,
111                        channel_id,
112                        payload,
113                    },
114                )
115            }),
116        )
117        .parse(i)
118    }
119}
120
121/// Parse a content header frame
122pub fn parse_content_header<I: ParsableInput>(i: I) -> ParserResult<I, AMQPContentHeader> {
123    context(
124        "parse_content_header",
125        map(
126            (
127                parse_id,
128                parse_short_uint,
129                parse_long_long_uint,
130                context("parse_properties", parse_properties),
131            ),
132            // FIXME: should we validate that weight is 0?
133            |(class_id, _weight, body_size, properties)| AMQPContentHeader {
134                class_id,
135                body_size,
136                properties,
137            },
138        ),
139    )
140    .parse(i)
141}
142
143#[cfg(test)]
144mod test {
145    use super::*;
146
147    #[test]
148    fn test_protocol_header() {
149        assert_eq!(
150            parse_frame(&[b'A', b'M', b'Q', b'P', 0, 0, 9, 1][..]),
151            Ok((
152                &[][..],
153                AMQPFrame::ProtocolHeader(ProtocolVersion::amqp_0_9_1())
154            ))
155        );
156    }
157
158    #[test]
159    fn test_heartbeat() {
160        assert_eq!(
161            parse_frame(&[8, 0, 1, 0, 0, 0, 0, 206][..]),
162            Ok((&[][..], AMQPFrame::Heartbeat(1)))
163        );
164    }
165
166    #[test]
167    fn test_parse_declare_queue_frame() {
168        let frame = AMQPFrame::Method(
169            1,
170            AMQPClass::Queue(queue::AMQPMethod::Declare(queue::Declare {
171                queue: "some_queue".into(),
172                passive: true,
173                durable: true,
174                exclusive: true,
175                auto_delete: true,
176                nowait: true,
177                arguments: Default::default(),
178            })),
179        );
180
181        let mut buffer = vec![0u8; 30];
182
183        assert!(gen_frame(&frame)(buffer.as_mut_slice().into()).is_ok());
184        assert_eq!(parse_frame(buffer.as_slice()), Ok((&[][..], frame)));
185    }
186}