1use crate::{
2 frame::{AMQPFrame, ProtocolVersion},
3 protocol::{basic::gen_properties, *},
4 types::{generation::*, *},
5};
6use cookie_factory::{combinator::slice, sequence::tuple};
7use std::io::Write;
8
9pub fn gen_frame<'a, W: Write + BackToTheBuffer + 'a>(
11 frame: &'a AMQPFrame,
12) -> impl SerializeFn<W> + 'a {
13 move |x| match frame {
14 AMQPFrame::ProtocolHeader(version) => gen_protocol_header(*version)(x),
15 AMQPFrame::Heartbeat(channel_id) => gen_heartbeat_frame(*channel_id)(x),
16 AMQPFrame::Method(channel_id, method) => gen_method_frame(*channel_id, method)(x),
17 AMQPFrame::Header(channel_id, class_id, header) => {
18 gen_content_header_frame(*channel_id, *class_id, header.body_size, &header.properties)(
19 x,
20 )
21 }
22 AMQPFrame::Body(channel_id, data) => gen_content_body_frame(*channel_id, data)(x),
23 }
24}
25
26fn gen_protocol_header<W: Write>(version: ProtocolVersion) -> impl SerializeFn<W> {
27 tuple((
28 slice(metadata::NAME.as_bytes()),
29 gen_short_short_uint(0),
30 gen_protocol_version(version),
31 ))
32}
33
34fn gen_protocol_version<W: Write>(version: ProtocolVersion) -> impl SerializeFn<W> {
35 tuple((
36 gen_short_short_uint(version.major),
37 gen_short_short_uint(version.minor),
38 gen_short_short_uint(version.revision),
39 ))
40}
41
42fn gen_heartbeat_frame<W: Write>(channel_id: ChannelId) -> impl SerializeFn<W> {
43 tuple((
44 gen_short_short_uint(constants::FRAME_HEARTBEAT),
45 gen_id(channel_id),
46 gen_long_uint(0),
47 gen_short_short_uint(constants::FRAME_END),
48 ))
49}
50
51fn gen_method_frame<'a, W: Write + BackToTheBuffer + 'a>(
52 channel_id: ChannelId,
53 class: &'a AMQPClass,
54) -> impl SerializeFn<W> + 'a {
55 tuple((
56 gen_short_short_uint(constants::FRAME_METHOD),
57 gen_id(channel_id),
58 gen_with_len(gen_class(class)),
59 gen_short_short_uint(constants::FRAME_END),
60 ))
61}
62
63fn gen_content_header_frame<'a, W: Write + BackToTheBuffer + 'a>(
64 channel_id: ChannelId,
65 class_id: Identifier,
66 length: PayloadSize,
67 properties: &'a basic::AMQPProperties,
68) -> impl SerializeFn<W> + 'a {
69 tuple((
70 gen_short_short_uint(constants::FRAME_HEADER),
71 gen_id(channel_id),
72 gen_with_len(tuple((
73 gen_id(class_id),
74 gen_short_uint(0 ),
75 gen_long_long_uint(length),
76 gen_properties(properties),
77 ))),
78 gen_short_short_uint(constants::FRAME_END),
79 ))
80}
81
82fn gen_content_body_frame<'a, W: Write + 'a>(
83 channel_id: ChannelId,
84 content: &'a [u8],
85) -> impl SerializeFn<W> + 'a {
86 tuple((
87 gen_short_short_uint(constants::FRAME_BODY),
88 gen_id(channel_id),
89 gen_long_uint(content.len() as ChunkSize),
90 slice(content),
91 gen_short_short_uint(constants::FRAME_END),
92 ))
93}
94
95#[cfg(test)]
96mod test {
97 use super::*;
98
99 #[test]
100 fn generate_header_frame() {
101 use crate::{
102 protocol::BasicProperties,
103 frame::{AMQPContentHeader, WriteContext},
104 };
105
106 let channel_id = 1;
107 let hdr = AMQPContentHeader {
108 class_id: 60,
109 body_size: 5,
110 properties: BasicProperties::default(),
111 };
112 let header = AMQPFrame::Header(channel_id, hdr.class_id, Box::new(hdr));
113
114 let buf = Vec::<u8>::new();
115 let val = gen_frame::<Vec<u8>>(&header);
116
117 let ctx = WriteContext::from(buf);
118 let (frame, _size) = val(ctx).unwrap().into_inner();
119 println!("header: {:?}", header);
120 println!("frame: {:?}", frame);
121
122 let expected = [
123 2, 0, 1, 0, 0, 0, 14, 0, 60, 0, 0, 0, 0, 0, 0, 0, 0, 0, 5, 0, 0, 206, ];
132
133 assert_eq!(frame, expected);
134 }
135}