1pub use crate::types::parsing::traits;
3use crate::{
4 frame::*,
5 protocol::{basic::parse_properties, *},
6 types::parsing::*,
7};
8use nom::{
9 Parser,
10 bytes::streaming::{tag, take},
11 combinator::{all_consuming, flat_map, map, map_opt, map_res},
12 error::context,
13};
14use traits::ParsableInput;
15
16pub fn parse_channel<I: ParsableInput>(i: I) -> ParserResult<I, AMQPChannel> {
18 context("parse_channel", map(parse_id, From::from)).parse(i)
19}
20
21pub fn parse_protocol_header<I: ParsableInput>(i: I) -> ParserResult<I, ProtocolVersion> {
23 context(
24 "parse_protocol_header",
25 map(
26 (
27 tag(&metadata::NAME.as_bytes()[1..]),
28 tag(&[0][..]),
29 parse_short_short_uint,
30 parse_short_short_uint,
31 parse_short_short_uint,
32 ),
33 |(_, _, major, minor, revision)| ProtocolVersion {
34 major,
35 minor,
36 revision,
37 },
38 ),
39 )
40 .parse(i)
41}
42
43pub fn parse_frame_type<I: ParsableInput>(i: I) -> ParserResult<I, AMQPFrameType> {
45 context(
46 "parse_frame_type",
47 map_opt(parse_short_short_uint, |method| match method {
48 c if c == metadata::NAME.as_bytes()[0] => Some(AMQPFrameType::ProtocolHeader),
49 constants::FRAME_METHOD => Some(AMQPFrameType::Method),
50 constants::FRAME_HEADER => Some(AMQPFrameType::Header),
51 constants::FRAME_BODY => Some(AMQPFrameType::Body),
52 constants::FRAME_HEARTBEAT => Some(AMQPFrameType::Heartbeat),
53 _ => None,
54 }),
55 )
56 .parse(i)
57}
58
59pub fn parse_frame<I: ParsableInput>(i: I) -> ParserResult<I, AMQPFrame> {
61 context(
62 "parse_frame",
63 flat_map(parse_frame_type, move |frame_type| {
64 move |i: I| match frame_type {
65 AMQPFrameType::ProtocolHeader => {
66 map(parse_protocol_header, AMQPFrame::ProtocolHeader).parse(i)
67 }
68 frame_type => map_res(
69 parse_raw_frame(frame_type),
70 |AMQPRawFrame {
71 channel_id,
72 frame_type,
73 payload,
74 }: AMQPRawFrame<I>| match frame_type {
75 AMQPFrameType::ProtocolHeader => {
77 Ok(AMQPFrame::ProtocolHeader(ProtocolVersion::amqp_0_9_1()))
78 }
79 AMQPFrameType::Method => all_consuming(parse_class)
80 .parse(payload)
81 .map(|(_, m)| AMQPFrame::Method(channel_id, m)),
82 AMQPFrameType::Header => all_consuming(parse_content_header)
83 .parse(payload)
84 .map(|(_, h)| AMQPFrame::Header(channel_id, h.class_id, Box::new(h))),
85 AMQPFrameType::Body => Ok(AMQPFrame::Body(
86 channel_id,
87 payload.iter_elements().collect(),
88 )),
89 AMQPFrameType::Heartbeat => Ok(AMQPFrame::Heartbeat(channel_id)),
90 },
91 )
92 .parse(i),
93 }
94 }),
95 )
96 .parse(i)
97}
98
99pub fn parse_raw_frame<I: ParsableInput>(
101 frame_type: AMQPFrameType,
102) -> impl FnMut(I) -> ParserResult<I, AMQPRawFrame<I>> {
103 move |i: I| {
104 context(
105 "parse_raw_frame",
106 flat_map((parse_id, parse_long_uint), move |(channel_id, size)| {
107 map(
108 (take(size), tag(&[constants::FRAME_END][..])),
109 move |(payload, _)| AMQPRawFrame {
110 frame_type,
111 channel_id,
112 payload,
113 },
114 )
115 }),
116 )
117 .parse(i)
118 }
119}
120
121pub fn parse_content_header<I: ParsableInput>(i: I) -> ParserResult<I, AMQPContentHeader> {
123 context(
124 "parse_content_header",
125 map(
126 (
127 parse_id,
128 parse_short_uint,
129 parse_long_long_uint,
130 context("parse_properties", parse_properties),
131 ),
132 |(class_id, _weight, body_size, properties)| AMQPContentHeader {
134 class_id,
135 body_size,
136 properties,
137 },
138 ),
139 )
140 .parse(i)
141}
142
143#[cfg(test)]
144mod test {
145 use super::*;
146
147 #[test]
148 fn test_protocol_header() {
149 assert_eq!(
150 parse_frame(&[b'A', b'M', b'Q', b'P', 0, 0, 9, 1][..]),
151 Ok((
152 &[][..],
153 AMQPFrame::ProtocolHeader(ProtocolVersion::amqp_0_9_1())
154 ))
155 );
156 }
157
158 #[test]
159 fn test_heartbeat() {
160 assert_eq!(
161 parse_frame(&[8, 0, 1, 0, 0, 0, 0, 206][..]),
162 Ok((&[][..], AMQPFrame::Heartbeat(1)))
163 );
164 }
165
166 #[test]
167 fn test_parse_declare_queue_frame() {
168 let frame = AMQPFrame::Method(
169 1,
170 AMQPClass::Queue(queue::AMQPMethod::Declare(queue::Declare {
171 queue: "some_queue".into(),
172 passive: true,
173 durable: true,
174 exclusive: true,
175 auto_delete: true,
176 nowait: true,
177 arguments: Default::default(),
178 })),
179 );
180
181 let mut buffer = vec![0u8; 30];
182
183 assert!(gen_frame(&frame)(buffer.as_mut_slice().into()).is_ok());
184 assert_eq!(parse_frame(buffer.as_slice()), Ok((&[][..], frame)));
185 }
186}