amq_protocol/frame/
parsing.rs

1/// Traits required for parsing
2pub use crate::types::parsing::traits;
3use crate::{
4    frame::*,
5    protocol::{basic::parse_properties, *},
6    types::parsing::*,
7};
8use nom::{
9    bytes::streaming::{tag, take},
10    combinator::{all_consuming, flat_map, map, map_opt, map_res},
11    error::context,
12    sequence::{pair, tuple},
13};
14use traits::ParsableInput;
15
16/// Parse a channel id
17pub fn parse_channel<I: ParsableInput>(i: I) -> ParserResult<I, AMQPChannel> {
18    context("parse_channel", map(parse_id, From::from))(i)
19}
20
21/// Parse the protocol header
22pub fn parse_protocol_header<I: ParsableInput>(i: I) -> ParserResult<I, ProtocolVersion> {
23    context(
24        "parse_protocol_header",
25        map(
26            tuple((
27                tag(&metadata::NAME.as_bytes()[1..]),
28                tag(&[0][..]),
29                parse_short_short_uint,
30                parse_short_short_uint,
31                parse_short_short_uint,
32            )),
33            |(_, _, major, minor, revision)| ProtocolVersion {
34                major,
35                minor,
36                revision,
37            },
38        ),
39    )(i)
40}
41
42/// Parse the frame type
43pub fn parse_frame_type<I: ParsableInput>(i: I) -> ParserResult<I, AMQPFrameType> {
44    context(
45        "parse_frame_type",
46        map_opt(parse_short_short_uint, |method| match method {
47            c if c == metadata::NAME.as_bytes()[0] => Some(AMQPFrameType::ProtocolHeader),
48            constants::FRAME_METHOD => Some(AMQPFrameType::Method),
49            constants::FRAME_HEADER => Some(AMQPFrameType::Header),
50            constants::FRAME_BODY => Some(AMQPFrameType::Body),
51            constants::FRAME_HEARTBEAT => Some(AMQPFrameType::Heartbeat),
52            _ => None,
53        }),
54    )(i)
55}
56
57/// Parse a full AMQP Frame (with contents)
58pub fn parse_frame<I: ParsableInput>(i: I) -> ParserResult<I, AMQPFrame> {
59    context(
60        "parse_frame",
61        flat_map(parse_frame_type, move |frame_type| {
62            move |i: I| match frame_type {
63                AMQPFrameType::ProtocolHeader => {
64                    map(parse_protocol_header, AMQPFrame::ProtocolHeader)(i)
65                }
66                frame_type => map_res(
67                    parse_raw_frame(frame_type),
68                    |AMQPRawFrame {
69                         channel_id,
70                         frame_type,
71                         payload,
72                     }: AMQPRawFrame<I>| match frame_type {
73                        // This should be unreachable be better have a sensitive value anyways
74                        AMQPFrameType::ProtocolHeader => {
75                            Ok(AMQPFrame::ProtocolHeader(ProtocolVersion::amqp_0_9_1()))
76                        }
77                        AMQPFrameType::Method => all_consuming(parse_class)(payload)
78                            .map(|(_, m)| AMQPFrame::Method(channel_id, m)),
79                        AMQPFrameType::Header => all_consuming(parse_content_header)(payload)
80                            .map(|(_, h)| AMQPFrame::Header(channel_id, h.class_id, Box::new(h))),
81                        AMQPFrameType::Body => Ok(AMQPFrame::Body(
82                            channel_id,
83                            payload.iter_elements().collect(),
84                        )),
85                        AMQPFrameType::Heartbeat => Ok(AMQPFrame::Heartbeat(channel_id)),
86                    },
87                )(i),
88            }
89        }),
90    )(i)
91}
92
93/// Parse a raw AMQP frame
94pub fn parse_raw_frame<I: ParsableInput>(
95    frame_type: AMQPFrameType,
96) -> impl FnMut(I) -> ParserResult<I, AMQPRawFrame<I>> {
97    move |i: I| {
98        context(
99            "parse_raw_frame",
100            flat_map(
101                pair(parse_id, parse_long_uint),
102                move |(channel_id, size)| {
103                    map(
104                        pair(take(size), tag(&[constants::FRAME_END][..])),
105                        move |(payload, _)| AMQPRawFrame {
106                            frame_type,
107                            channel_id,
108                            payload,
109                        },
110                    )
111                },
112            ),
113        )(i)
114    }
115}
116
117/// Parse a content header frame
118pub fn parse_content_header<I: ParsableInput>(i: I) -> ParserResult<I, AMQPContentHeader> {
119    context(
120        "parse_content_header",
121        map(
122            tuple((
123                parse_id,
124                parse_short_uint,
125                parse_long_long_uint,
126                context("parse_properties", parse_properties),
127            )),
128            // FIXME: should we validate that weight is 0?
129            |(class_id, _weight, body_size, properties)| AMQPContentHeader {
130                class_id,
131                body_size,
132                properties,
133            },
134        ),
135    )(i)
136}
137
138#[cfg(test)]
139mod test {
140    use super::*;
141
142    #[test]
143    fn test_protocol_header() {
144        assert_eq!(
145            parse_frame(&[b'A', b'M', b'Q', b'P', 0, 0, 9, 1][..]),
146            Ok((
147                &[][..],
148                AMQPFrame::ProtocolHeader(ProtocolVersion::amqp_0_9_1())
149            ))
150        );
151    }
152
153    #[test]
154    fn test_heartbeat() {
155        assert_eq!(
156            parse_frame(&[8, 0, 1, 0, 0, 0, 0, 206][..]),
157            Ok((&[][..], AMQPFrame::Heartbeat(1)))
158        );
159    }
160
161    #[test]
162    fn test_parse_declare_queue_frame() {
163        let frame = AMQPFrame::Method(
164            1,
165            AMQPClass::Queue(queue::AMQPMethod::Declare(queue::Declare {
166                queue: "some_queue".into(),
167                passive: true,
168                durable: true,
169                exclusive: true,
170                auto_delete: true,
171                nowait: true,
172                arguments: Default::default(),
173            })),
174        );
175
176        let mut buffer = vec![0u8; 30];
177
178        assert!(gen_frame(&frame)(buffer.as_mut_slice().into()).is_ok());
179        assert_eq!(parse_frame(buffer.as_slice()), Ok((&[][..], frame)));
180    }
181}