1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
/// Traits required for parsing
pub use crate::types::parsing::traits;
use crate::{
    frame::*,
    protocol::{basic::parse_properties, *},
    types::parsing::*,
};
use nom::{
    bytes::streaming::{tag, take},
    combinator::{all_consuming, flat_map, map, map_opt, map_res},
    error::context,
    sequence::{pair, tuple},
};
use traits::ParsableInput;

/// Parse a channel id
pub fn parse_channel<I: ParsableInput>(i: I) -> ParserResult<I, AMQPChannel> {
    context("parse_channel", map(parse_id, From::from))(i)
}

/// Parse the protocol header
pub fn parse_protocol_header<I: ParsableInput>(i: I) -> ParserResult<I, ()> {
    context(
        "parse_protocol_header",
        map(
            pair(
                tag(metadata::NAME.as_bytes()),
                tag(&[
                    0,
                    metadata::MAJOR_VERSION,
                    metadata::MINOR_VERSION,
                    metadata::REVISION,
                ][..]),
            ),
            |_| (),
        ),
    )(i)
}

/// Parse the frame type
pub fn parse_frame_type<I: ParsableInput>(i: I) -> ParserResult<I, AMQPFrameType> {
    context(
        "parse_frame_type",
        map_opt(parse_short_short_uint, |method| match method {
            constants::FRAME_METHOD => Some(AMQPFrameType::Method),
            constants::FRAME_HEADER => Some(AMQPFrameType::Header),
            constants::FRAME_BODY => Some(AMQPFrameType::Body),
            constants::FRAME_HEARTBEAT => Some(AMQPFrameType::Heartbeat),
            _ => None,
        }),
    )(i)
}

/// Parse a full AMQP Frame (with contents)
pub fn parse_frame<I: ParsableInput>(i: I) -> ParserResult<I, AMQPFrame> {
    context(
        "parse_frame",
        map_res(
            parse_raw_frame,
            |AMQPRawFrame {
                 channel_id,
                 frame_type,
                 payload,
             }: AMQPRawFrame<I>| match frame_type {
                AMQPFrameType::Method => all_consuming(parse_class)(payload)
                    .map(|(_, m)| AMQPFrame::Method(channel_id, m)),
                AMQPFrameType::Header => all_consuming(parse_content_header)(payload)
                    .map(|(_, h)| AMQPFrame::Header(channel_id, h.class_id, Box::new(h))),
                AMQPFrameType::Body => Ok(AMQPFrame::Body(
                    channel_id,
                    payload.iter_elements().collect(),
                )),
                AMQPFrameType::Heartbeat => Ok(AMQPFrame::Heartbeat(channel_id)),
            },
        ),
    )(i)
}

/// Parse a raw AMQP frame
pub fn parse_raw_frame<I: ParsableInput>(i: I) -> ParserResult<I, AMQPRawFrame<I>> {
    context(
        "parse_raw_frame",
        flat_map(
            tuple((parse_frame_type, parse_id, parse_long_uint)),
            move |(frame_type, channel_id, size)| {
                map(
                    pair(take(size), tag(&[constants::FRAME_END][..])),
                    move |(payload, _)| AMQPRawFrame {
                        frame_type,
                        channel_id,
                        payload,
                    },
                )
            },
        ),
    )(i)
}

/// Parse a content header frame
pub fn parse_content_header<I: ParsableInput>(i: I) -> ParserResult<I, AMQPContentHeader> {
    context(
        "parse_content_header",
        map(
            tuple((
                parse_id,
                parse_short_uint,
                parse_long_long_uint,
                context("parse_propertes", parse_properties),
            )),
            |(class_id, weight, body_size, properties)| AMQPContentHeader {
                class_id,
                weight,
                body_size,
                properties,
            },
        ),
    )(i)
}