1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
pub mod codec;
pub mod frame;
#[macro_use]
extern crate bitflags;
use std::fmt;
pub type Error = Box<dyn std::error::Error + Send + Sync>;
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug)]
pub struct FrameError {
pub code: u16,
pub message: String,
}
impl fmt::Display for FrameError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", &self)
}
}
impl std::error::Error for FrameError {}
#[macro_export]
macro_rules! frame_error {
($code:expr, $message:expr) => {
::std::result::Result::Err(Box::new($crate::FrameError {
code: $code,
message: ::std::string::String::from($message),
}))
};
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::{Buf, BufMut, BytesMut};
use codec::{AMQPCodec, Frame};
use frame::{AMQPFrame, MethodFrameArgs};
use tokio_util::codec::Encoder;
#[test]
fn encode_header_frame() {
let mut encoder = AMQPCodec {};
let mut buf = BytesMut::with_capacity(1024);
let res = encoder.encode(Frame::Frame(AMQPFrame::Header), &mut buf);
assert!(res.is_ok());
let expected = b"AMQP\x00\x00\x09\x01";
let mut current = [0u8; 8];
buf.copy_to_slice(&mut current[..]);
assert_eq!(expected, ¤t);
}
#[test]
fn encode_method_frame() {
let mut encoder = AMQPCodec {};
let mut buf = BytesMut::with_capacity(1024);
let args = frame::QueueBindArgs {
queue_name: "queue".into(),
exchange_name: "exchg".into(),
routing_key: "key".into(),
no_wait: false,
args: None,
};
let res = encoder.encode(
Frame::Frame(AMQPFrame::Method(
0x0205,
frame::QUEUE_BIND,
MethodFrameArgs::QueueBind(args),
)),
&mut buf,
);
assert!(res.is_ok());
let frame_header = b"\x01\x02\x05";
let class_method = b"\x00\x32\x00\x14";
let mut argbuf = BytesMut::with_capacity(256);
argbuf.put(&class_method[..]);
argbuf.put(&b"\x00\x00"[..]);
argbuf.put(&b"\x05queue"[..]);
argbuf.put(&b"\x05exchg"[..]);
argbuf.put(&b"\x03key"[..]);
argbuf.put(&b"\x00"[..]);
argbuf.put(&b"\x00\x00\x00\x00"[..]);
let mut expected = BytesMut::with_capacity(256);
expected.put(&frame_header[..]);
expected.put_u32(argbuf.len() as u32);
expected.put(argbuf);
expected.put_u8(0xCE);
assert_eq!(expected, buf);
}
}