1pub use crate::types::parsing::traits;
3use crate::{
4 frame::*,
5 protocol::{basic::parse_properties, *},
6 types::parsing::*,
7};
8use nom::{
9 Parser,
10 bytes::streaming::{tag, take},
11 combinator::{all_consuming, flat_map, map, map_opt, map_res},
12 error::context,
13};
14use traits::ParsableInput;
15
16pub fn parse_channel<I: ParsableInput>(i: I) -> ParserResult<I, AMQPChannel> {
18 context("parse_channel", map(parse_id, From::from)).parse(i)
19}
20
21pub fn parse_protocol_header<I: ParsableInput>(i: I) -> ParserResult<I, ProtocolVersion> {
23 context(
24 "parse_protocol_header",
25 map(
26 (
27 tag(&metadata::NAME.as_bytes()[1..]),
28 tag(&[0][..]),
29 parse_short_short_uint,
30 parse_short_short_uint,
31 parse_short_short_uint,
32 ),
33 |(_, _, major, minor, revision)| ProtocolVersion {
34 major,
35 minor,
36 revision,
37 },
38 ),
39 )
40 .parse(i)
41}
42
43pub fn parse_frame_type<I: ParsableInput>(i: I) -> ParserResult<I, AMQPFrameType> {
45 context(
46 "parse_frame_type",
47 map_opt(parse_short_short_uint, |method| match method {
48 c if c == metadata::NAME.as_bytes()[0] => Some(AMQPFrameType::ProtocolHeader),
49 constants::FRAME_METHOD => Some(AMQPFrameType::Method),
50 constants::FRAME_HEADER => Some(AMQPFrameType::Header),
51 constants::FRAME_BODY => Some(AMQPFrameType::Body),
52 constants::FRAME_HEARTBEAT => Some(AMQPFrameType::Heartbeat),
53 _ => None,
54 }),
55 )
56 .parse(i)
57}
58
59pub fn parse_frame<I: ParsableInput>(i: I) -> ParserResult<I, AMQPFrame> {
61 context(
62 "parse_frame",
63 flat_map(parse_frame_type, move |frame_type| {
64 move |i: I| match frame_type {
65 AMQPFrameType::ProtocolHeader => {
66 map(parse_protocol_header, AMQPFrame::ProtocolHeader).parse(i)
67 }
68 frame_type => map_res(
69 parse_raw_frame(frame_type),
70 |AMQPRawFrame {
71 channel_id,
72 frame_type,
73 payload,
74 }: AMQPRawFrame<I>| match frame_type {
75 AMQPFrameType::ProtocolHeader => {
77 Ok(AMQPFrame::ProtocolHeader(ProtocolVersion::amqp_0_9_1()))
78 }
79 AMQPFrameType::Method => all_consuming(parse_class)
80 .parse(payload)
81 .map(|(_, m)| AMQPFrame::Method(channel_id, m)),
82 AMQPFrameType::Header => all_consuming(parse_content_header)
83 .parse(payload)
84 .map(|(_, h)| AMQPFrame::Header(channel_id, h)),
85 AMQPFrameType::Body => Ok(AMQPFrame::Body(
86 channel_id,
87 payload.iter_elements().collect(),
88 )),
89 AMQPFrameType::Heartbeat => Ok(if channel_id == 0 {
90 AMQPFrame::Heartbeat
91 } else {
92 AMQPFrame::InvalidHeartbeat(channel_id)
93 }),
94 },
95 )
96 .parse(i),
97 }
98 }),
99 )
100 .parse(i)
101}
102
103pub fn parse_raw_frame<I: ParsableInput>(
105 frame_type: AMQPFrameType,
106) -> impl FnMut(I) -> ParserResult<I, AMQPRawFrame<I>> {
107 move |i: I| {
108 context(
109 "parse_raw_frame",
110 flat_map((parse_id, parse_long_uint), move |(channel_id, size)| {
111 map(
112 (take(size), tag(&[constants::FRAME_END][..])),
113 move |(payload, _)| AMQPRawFrame {
114 frame_type,
115 channel_id,
116 payload,
117 },
118 )
119 }),
120 )
121 .parse(i)
122 }
123}
124
125pub fn parse_content_header<I: ParsableInput>(i: I) -> ParserResult<I, AMQPContentHeader> {
127 context(
128 "parse_content_header",
129 map(
130 (
131 parse_id,
132 parse_short_uint,
133 parse_long_long_uint,
134 context("parse_properties", parse_properties),
135 ),
136 |(class_id, _weight, body_size, properties)| AMQPContentHeader {
138 class_id,
139 body_size,
140 properties,
141 },
142 ),
143 )
144 .parse(i)
145}
146
147#[cfg(test)]
148mod test {
149 use super::*;
150
151 #[test]
152 fn test_protocol_header() {
153 assert_eq!(
154 parse_frame(&[b'A', b'M', b'Q', b'P', 0, 0, 9, 1][..]),
155 Ok((
156 &[][..],
157 AMQPFrame::ProtocolHeader(ProtocolVersion::amqp_0_9_1())
158 ))
159 );
160 }
161
162 #[test]
163 fn test_heartbeat() {
164 assert_eq!(
165 parse_frame(&[8, 0, 0, 0, 0, 0, 0, 206][..]),
166 Ok((&[][..], AMQPFrame::Heartbeat))
167 );
168 assert_eq!(
169 parse_frame(&[8, 0, 1, 0, 0, 0, 0, 206][..]),
170 Ok((&[][..], AMQPFrame::InvalidHeartbeat(1)))
171 );
172 }
173
174 #[test]
175 fn test_parse_declare_queue_frame() {
176 let frame = AMQPFrame::Method(
177 1,
178 AMQPClass::Queue(queue::AMQPMethod::Declare(queue::Declare {
179 queue: "some_queue".into(),
180 passive: true,
181 durable: true,
182 exclusive: true,
183 auto_delete: true,
184 nowait: true,
185 arguments: Default::default(),
186 })),
187 );
188
189 let mut buffer = vec![0u8; 30];
190
191 assert!(gen_frame(&frame)(buffer.as_mut_slice().into()).is_ok());
192 assert_eq!(parse_frame(buffer.as_slice()), Ok((&[][..], frame)));
193 }
194}