amq_protocol/frame/
parsing.rs

1/// Traits required for parsing
2pub use crate::types::parsing::traits;
3use crate::{
4    frame::*,
5    protocol::{basic::parse_properties, *},
6    types::parsing::*,
7};
8use nom::{
9    Parser,
10    bytes::streaming::{tag, take},
11    combinator::{all_consuming, flat_map, map, map_opt, map_res},
12    error::context,
13};
14use traits::ParsableInput;
15
16/// Parse a channel id
17pub fn parse_channel<I: ParsableInput>(i: I) -> ParserResult<I, AMQPChannel> {
18    context("parse_channel", map(parse_id, From::from)).parse(i)
19}
20
21/// Parse the protocol header
22pub fn parse_protocol_header<I: ParsableInput>(i: I) -> ParserResult<I, ProtocolVersion> {
23    context(
24        "parse_protocol_header",
25        map(
26            (
27                tag(&metadata::NAME.as_bytes()[1..]),
28                tag(&[0][..]),
29                parse_short_short_uint,
30                parse_short_short_uint,
31                parse_short_short_uint,
32            ),
33            |(_, _, major, minor, revision)| ProtocolVersion {
34                major,
35                minor,
36                revision,
37            },
38        ),
39    )
40    .parse(i)
41}
42
43/// Parse the frame type
44pub fn parse_frame_type<I: ParsableInput>(i: I) -> ParserResult<I, AMQPFrameType> {
45    context(
46        "parse_frame_type",
47        map_opt(parse_short_short_uint, |method| match method {
48            c if c == metadata::NAME.as_bytes()[0] => Some(AMQPFrameType::ProtocolHeader),
49            constants::FRAME_METHOD => Some(AMQPFrameType::Method),
50            constants::FRAME_HEADER => Some(AMQPFrameType::Header),
51            constants::FRAME_BODY => Some(AMQPFrameType::Body),
52            constants::FRAME_HEARTBEAT => Some(AMQPFrameType::Heartbeat),
53            _ => None,
54        }),
55    )
56    .parse(i)
57}
58
59/// Parse a full AMQP Frame (with contents)
60pub fn parse_frame<I: ParsableInput>(i: I) -> ParserResult<I, AMQPFrame> {
61    context(
62        "parse_frame",
63        flat_map(parse_frame_type, move |frame_type| {
64            move |i: I| match frame_type {
65                AMQPFrameType::ProtocolHeader => {
66                    map(parse_protocol_header, AMQPFrame::ProtocolHeader).parse(i)
67                }
68                frame_type => map_res(
69                    parse_raw_frame(frame_type),
70                    |AMQPRawFrame {
71                         channel_id,
72                         frame_type,
73                         payload,
74                     }: AMQPRawFrame<I>| match frame_type {
75                        // This should be unreachable be better have a sensitive value anyways
76                        AMQPFrameType::ProtocolHeader => {
77                            Ok(AMQPFrame::ProtocolHeader(ProtocolVersion::amqp_0_9_1()))
78                        }
79                        AMQPFrameType::Method => all_consuming(parse_class)
80                            .parse(payload)
81                            .map(|(_, m)| AMQPFrame::Method(channel_id, m)),
82                        AMQPFrameType::Header => all_consuming(parse_content_header)
83                            .parse(payload)
84                            .map(|(_, h)| AMQPFrame::Header(channel_id, h)),
85                        AMQPFrameType::Body => Ok(AMQPFrame::Body(
86                            channel_id,
87                            payload.iter_elements().collect(),
88                        )),
89                        AMQPFrameType::Heartbeat => Ok(if channel_id == 0 {
90                            AMQPFrame::Heartbeat
91                        } else {
92                            AMQPFrame::InvalidHeartbeat(channel_id)
93                        }),
94                    },
95                )
96                .parse(i),
97            }
98        }),
99    )
100    .parse(i)
101}
102
103/// Parse a raw AMQP frame
104pub fn parse_raw_frame<I: ParsableInput>(
105    frame_type: AMQPFrameType,
106) -> impl FnMut(I) -> ParserResult<I, AMQPRawFrame<I>> {
107    move |i: I| {
108        context(
109            "parse_raw_frame",
110            flat_map((parse_id, parse_long_uint), move |(channel_id, size)| {
111                map(
112                    (take(size), tag(&[constants::FRAME_END][..])),
113                    move |(payload, _)| AMQPRawFrame {
114                        frame_type,
115                        channel_id,
116                        payload,
117                    },
118                )
119            }),
120        )
121        .parse(i)
122    }
123}
124
125/// Parse a content header frame
126pub fn parse_content_header<I: ParsableInput>(i: I) -> ParserResult<I, AMQPContentHeader> {
127    context(
128        "parse_content_header",
129        map(
130            (
131                parse_id,
132                parse_short_uint,
133                parse_long_long_uint,
134                context("parse_properties", parse_properties),
135            ),
136            // FIXME: should we validate that weight is 0?
137            |(class_id, _weight, body_size, properties)| AMQPContentHeader {
138                class_id,
139                body_size,
140                properties,
141            },
142        ),
143    )
144    .parse(i)
145}
146
147#[cfg(test)]
148mod test {
149    use super::*;
150
151    #[test]
152    fn test_protocol_header() {
153        assert_eq!(
154            parse_frame(&[b'A', b'M', b'Q', b'P', 0, 0, 9, 1][..]),
155            Ok((
156                &[][..],
157                AMQPFrame::ProtocolHeader(ProtocolVersion::amqp_0_9_1())
158            ))
159        );
160    }
161
162    #[test]
163    fn test_heartbeat() {
164        assert_eq!(
165            parse_frame(&[8, 0, 0, 0, 0, 0, 0, 206][..]),
166            Ok((&[][..], AMQPFrame::Heartbeat))
167        );
168        assert_eq!(
169            parse_frame(&[8, 0, 1, 0, 0, 0, 0, 206][..]),
170            Ok((&[][..], AMQPFrame::InvalidHeartbeat(1)))
171        );
172    }
173
174    #[test]
175    fn test_parse_declare_queue_frame() {
176        let frame = AMQPFrame::Method(
177            1,
178            AMQPClass::Queue(queue::AMQPMethod::Declare(queue::Declare {
179                queue: "some_queue".into(),
180                passive: true,
181                durable: true,
182                exclusive: true,
183                auto_delete: true,
184                nowait: true,
185                arguments: Default::default(),
186            })),
187        );
188
189        let mut buffer = vec![0u8; 30];
190
191        assert!(gen_frame(&frame)(buffer.as_mut_slice().into()).is_ok());
192        assert_eq!(parse_frame(buffer.as_slice()), Ok((&[][..], frame)));
193    }
194}