1pub use crate::types::parsing::traits;
3use crate::{
4 frame::*,
5 protocol::{basic::parse_properties, *},
6 types::parsing::*,
7};
8use nom::{
9 bytes::streaming::{tag, take},
10 combinator::{all_consuming, flat_map, map, map_opt, map_res},
11 error::context,
12 sequence::{pair, tuple},
13};
14use traits::ParsableInput;
15
16pub fn parse_channel<I: ParsableInput>(i: I) -> ParserResult<I, AMQPChannel> {
18 context("parse_channel", map(parse_id, From::from))(i)
19}
20
21pub fn parse_protocol_header<I: ParsableInput>(i: I) -> ParserResult<I, ProtocolVersion> {
23 context(
24 "parse_protocol_header",
25 map(
26 tuple((
27 tag(&metadata::NAME.as_bytes()[1..]),
28 tag(&[0][..]),
29 parse_short_short_uint,
30 parse_short_short_uint,
31 parse_short_short_uint,
32 )),
33 |(_, _, major, minor, revision)| ProtocolVersion {
34 major,
35 minor,
36 revision,
37 },
38 ),
39 )(i)
40}
41
42pub fn parse_frame_type<I: ParsableInput>(i: I) -> ParserResult<I, AMQPFrameType> {
44 context(
45 "parse_frame_type",
46 map_opt(parse_short_short_uint, |method| match method {
47 c if c == metadata::NAME.as_bytes()[0] => Some(AMQPFrameType::ProtocolHeader),
48 constants::FRAME_METHOD => Some(AMQPFrameType::Method),
49 constants::FRAME_HEADER => Some(AMQPFrameType::Header),
50 constants::FRAME_BODY => Some(AMQPFrameType::Body),
51 constants::FRAME_HEARTBEAT => Some(AMQPFrameType::Heartbeat),
52 _ => None,
53 }),
54 )(i)
55}
56
57pub fn parse_frame<I: ParsableInput>(i: I) -> ParserResult<I, AMQPFrame> {
59 context(
60 "parse_frame",
61 flat_map(parse_frame_type, move |frame_type| {
62 move |i: I| match frame_type {
63 AMQPFrameType::ProtocolHeader => {
64 map(parse_protocol_header, AMQPFrame::ProtocolHeader)(i)
65 }
66 frame_type => map_res(
67 parse_raw_frame(frame_type),
68 |AMQPRawFrame {
69 channel_id,
70 frame_type,
71 payload,
72 }: AMQPRawFrame<I>| match frame_type {
73 AMQPFrameType::ProtocolHeader => {
75 Ok(AMQPFrame::ProtocolHeader(ProtocolVersion::amqp_0_9_1()))
76 }
77 AMQPFrameType::Method => all_consuming(parse_class)(payload)
78 .map(|(_, m)| AMQPFrame::Method(channel_id, m)),
79 AMQPFrameType::Header => all_consuming(parse_content_header)(payload)
80 .map(|(_, h)| AMQPFrame::Header(channel_id, h.class_id, Box::new(h))),
81 AMQPFrameType::Body => Ok(AMQPFrame::Body(
82 channel_id,
83 payload.iter_elements().collect(),
84 )),
85 AMQPFrameType::Heartbeat => Ok(AMQPFrame::Heartbeat(channel_id)),
86 },
87 )(i),
88 }
89 }),
90 )(i)
91}
92
93pub fn parse_raw_frame<I: ParsableInput>(
95 frame_type: AMQPFrameType,
96) -> impl FnMut(I) -> ParserResult<I, AMQPRawFrame<I>> {
97 move |i: I| {
98 context(
99 "parse_raw_frame",
100 flat_map(
101 pair(parse_id, parse_long_uint),
102 move |(channel_id, size)| {
103 map(
104 pair(take(size), tag(&[constants::FRAME_END][..])),
105 move |(payload, _)| AMQPRawFrame {
106 frame_type,
107 channel_id,
108 payload,
109 },
110 )
111 },
112 ),
113 )(i)
114 }
115}
116
117pub fn parse_content_header<I: ParsableInput>(i: I) -> ParserResult<I, AMQPContentHeader> {
119 context(
120 "parse_content_header",
121 map(
122 tuple((
123 parse_id,
124 parse_short_uint,
125 parse_long_long_uint,
126 context("parse_properties", parse_properties),
127 )),
128 |(class_id, _weight, body_size, properties)| AMQPContentHeader {
130 class_id,
131 body_size,
132 properties,
133 },
134 ),
135 )(i)
136}
137
138#[cfg(test)]
139mod test {
140 use super::*;
141
142 #[test]
143 fn test_protocol_header() {
144 assert_eq!(
145 parse_frame(&[b'A', b'M', b'Q', b'P', 0, 0, 9, 1][..]),
146 Ok((
147 &[][..],
148 AMQPFrame::ProtocolHeader(ProtocolVersion::amqp_0_9_1())
149 ))
150 );
151 }
152
153 #[test]
154 fn test_heartbeat() {
155 assert_eq!(
156 parse_frame(&[8, 0, 1, 0, 0, 0, 0, 206][..]),
157 Ok((&[][..], AMQPFrame::Heartbeat(1)))
158 );
159 }
160
161 #[test]
162 fn test_parse_declare_queue_frame() {
163 let frame = AMQPFrame::Method(
164 1,
165 AMQPClass::Queue(queue::AMQPMethod::Declare(queue::Declare {
166 queue: "some_queue".into(),
167 passive: true,
168 durable: true,
169 exclusive: true,
170 auto_delete: true,
171 nowait: true,
172 arguments: Default::default(),
173 })),
174 );
175
176 let mut buffer = vec![0u8; 30];
177
178 assert!(gen_frame(&frame)(buffer.as_mut_slice().into()).is_ok());
179 assert_eq!(parse_frame(buffer.as_slice()), Ok((&[][..], frame)));
180 }
181}