amq_protocol/frame/
generation.rs1use crate::{
2 frame::{AMQPFrame, ProtocolVersion},
3 protocol::{basic::gen_properties, *},
4 types::{generation::*, *},
5};
6use cookie_factory::{combinator::slice, sequence::tuple};
7use std::io::Write;
8
9pub fn gen_frame<'a, W: Write + BackToTheBuffer + 'a>(
11 frame: &'a AMQPFrame,
12) -> impl SerializeFn<W> + 'a {
13 move |x| match frame {
14 AMQPFrame::ProtocolHeader(version) => gen_protocol_header(*version)(x),
15 AMQPFrame::Heartbeat(channel_id) => gen_heartbeat_frame(*channel_id)(x),
16 AMQPFrame::Method(channel_id, method) => gen_method_frame(*channel_id, method)(x),
17 AMQPFrame::Header(channel_id, class_id, header) => {
18 gen_content_header_frame(*channel_id, *class_id, header.body_size, &header.properties)(
19 x,
20 )
21 }
22 AMQPFrame::Body(channel_id, data) => gen_content_body_frame(*channel_id, data)(x),
23 }
24}
25
26fn gen_protocol_header<W: Write>(version: ProtocolVersion) -> impl SerializeFn<W> {
27 tuple((
28 slice(metadata::NAME.as_bytes()),
29 gen_short_short_uint(0),
30 gen_protocol_version(version),
31 ))
32}
33
34fn gen_protocol_version<W: Write>(version: ProtocolVersion) -> impl SerializeFn<W> {
35 tuple((
36 gen_short_short_uint(version.major),
37 gen_short_short_uint(version.minor),
38 gen_short_short_uint(version.revision),
39 ))
40}
41
42fn gen_heartbeat_frame<W: Write>(channel_id: ChannelId) -> impl SerializeFn<W> {
43 tuple((
44 gen_short_short_uint(constants::FRAME_HEARTBEAT),
45 gen_id(channel_id),
46 gen_long_uint(0),
47 gen_short_short_uint(constants::FRAME_END),
48 ))
49}
50
51fn gen_method_frame<'a, W: Write + BackToTheBuffer + 'a>(
52 channel_id: ChannelId,
53 class: &'a AMQPClass,
54) -> impl SerializeFn<W> + 'a {
55 tuple((
56 gen_short_short_uint(constants::FRAME_METHOD),
57 gen_id(channel_id),
58 gen_with_len(gen_class(class)),
59 gen_short_short_uint(constants::FRAME_END),
60 ))
61}
62
63fn gen_content_header_frame<'a, W: Write + BackToTheBuffer + 'a>(
64 channel_id: ChannelId,
65 class_id: Identifier,
66 length: PayloadSize,
67 properties: &'a basic::AMQPProperties,
68) -> impl SerializeFn<W> + 'a {
69 tuple((
70 gen_short_short_uint(constants::FRAME_HEADER),
71 gen_id(channel_id),
72 gen_with_len(tuple((
73 gen_id(class_id),
74 gen_short_uint(0 ),
75 gen_long_long_uint(length),
76 gen_properties(properties),
77 ))),
78 gen_short_short_uint(constants::FRAME_END),
79 ))
80}
81
82fn gen_content_body_frame<'a, W: Write + 'a>(
83 channel_id: ChannelId,
84 content: &'a [u8],
85) -> impl SerializeFn<W> + 'a {
86 tuple((
87 gen_short_short_uint(constants::FRAME_BODY),
88 gen_id(channel_id),
89 gen_long_uint(content.len() as ChunkSize),
90 slice(content),
91 gen_short_short_uint(constants::FRAME_END),
92 ))
93}