use framing::{FrameType, Frame};
use amqp_error::{AMQPError, AMQPResult};
use std::io::{Read, Write};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
pub trait Method: Sized {
fn decode(method_frame: MethodFrame) -> AMQPResult<Self>;
fn encode(&self) -> AMQPResult<Vec<u8>>;
fn name(&self) -> &'static str;
fn id(&self) -> u16;
fn class_id(&self) -> u16;
}
#[derive(Debug, Clone)]
pub struct MethodFrame {
pub class_id: u16,
pub method_id: u16,
pub arguments: Vec<u8>,
}
impl MethodFrame {
pub fn encode_method<T>(method: &T) -> AMQPResult<Vec<u8>>
where T: Method
{
let frame = MethodFrame {
class_id: method.class_id(),
method_id: method.id(),
arguments: try!(method.encode()),
};
frame.encode()
}
pub fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(writer.write_u16::<BigEndian>(self.class_id));
try!(writer.write_u16::<BigEndian>(self.method_id));
try!(writer.write_all(&self.arguments));
Ok(writer)
}
pub fn decode(frame: &Frame) -> AMQPResult<MethodFrame> {
if frame.frame_type != FrameType::METHOD {
return Err(AMQPError::DecodeError("Not a method frame"));
}
let reader = &mut &frame.payload[..];
let class_id = try!(reader.read_u16::<BigEndian>());
let method_id = try!(reader.read_u16::<BigEndian>());
let mut arguments = vec![];
try!(reader.read_to_end(&mut arguments));
Ok(MethodFrame {
class_id: class_id,
method_id: method_id,
arguments: arguments,
})
}
pub fn method_name(&self) -> &'static str {
match (self.class_id, self.method_id) {
(10, 10) => "connection.start",
(10, 11) => "connection.start-ok",
(10, 20) => "connection.secure",
(10, 21) => "connection.secure-ok",
(10, 30) => "connection.tune",
(10, 31) => "connection.tune-ok",
(10, 40) => "connection.open",
(10, 41) => "connection.open-ok",
(10, 50) => "connection.close",
(10, 51) => "connection.close-ok",
(10, 60) => "connection.blocked",
(10, 61) => "connection.unblocked",
(20, 10) => "channel.open",
(20, 11) => "channel.open-ok",
(20, 20) => "channel.flow",
(20, 21) => "channel.flow-ok",
(20, 40) => "channel.close",
(20, 41) => "channel.close-ok",
(30, 10) => "access.request",
(30, 11) => "access.request-ok",
(40, 10) => "exchange.declare",
(40, 11) => "exchange.declare-ok",
(40, 20) => "exchange.delete",
(40, 21) => "exchange.delete-ok",
(40, 30) => "exchange.bind",
(40, 31) => "exchange.bind-ok",
(40, 40) => "exchange.unbind",
(40, 51) => "exchange.unbind-ok",
(50, 10) => "queue.declare",
(50, 11) => "queue.declare-ok",
(50, 20) => "queue.bind",
(50, 21) => "queue.bind-ok",
(50, 30) => "queue.purge",
(50, 31) => "queue.purge-ok",
(50, 40) => "queue.delete",
(50, 41) => "queue.delete-ok",
(50, 50) => "queue.unbind",
(50, 51) => "queue.unbind-ok",
(60, 10) => "basic.qos",
(60, 11) => "basic.qos-ok",
(60, 20) => "basic.consume",
(60, 21) => "basic.consume-ok",
(60, 30) => "basic.cancel",
(60, 31) => "basic.cancel-ok",
(60, 40) => "basic.publish",
(60, 50) => "basic.return",
(60, 60) => "basic.deliver",
(60, 70) => "basic.get",
(60, 71) => "basic.get-ok",
(60, 72) => "basic.get-empty",
(60, 80) => "basic.ack",
(60, 90) => "basic.reject",
(60, 100) => "basic.recover-async",
(60, 110) => "basic.recover",
(60, 111) => "basic.recover-ok",
(60, 120) => "basic.nack",
(90, 10) => "tx.select",
(90, 11) => "tx.select-ok",
(90, 20) => "tx.commit",
(90, 21) => "tx.commit-ok",
(90, 30) => "tx.rollback",
(90, 31) => "tx.rollback-ok",
(85, 10) => "confirm.select",
(85, 11) => "confirm.select-ok",
(_, _) => "UNKNOWN",
}
}
}
#[allow(unused_imports)]
#[allow(missing_copy_implementations)]
pub mod connection {
use bit_vec::BitVec;
use table;
use table::{Table, decode_table, encode_table};
use protocol::{Method, MethodFrame};
use framing::ContentHeaderFrame;
use amqp_error::{AMQPResult, AMQPError};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use std::io::{Read, Write};
#[derive(Debug)]
pub struct Start {
pub version_major: u8,
pub version_minor: u8,
pub server_properties: Table,
pub mechanisms: String,
pub locales: String,
}
impl Method for Start {
fn name(&self) -> &'static str {
"connection.start"
}
fn id(&self) -> u16 {
10
}
fn class_id(&self) -> u16 {
10
}
fn decode(method_frame: MethodFrame) -> AMQPResult<Start> {
match (method_frame.class_id, method_frame.method_id) {
(10, 10) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let version_major = try!(reader.read_u8());
let version_minor = try!(reader.read_u8());
let server_properties = try!(decode_table(reader));
let mechanisms = {
let size = try!(reader.read_u32::<BigEndian>()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
let locales = {
let size = try!(reader.read_u32::<BigEndian>()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
Ok(Start {
version_major: version_major,
version_minor: version_minor,
server_properties: server_properties,
mechanisms: mechanisms,
locales: locales,
})
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(writer.write_u8(self.version_major));
try!(writer.write_u8(self.version_minor));
try!(encode_table(&mut writer, &self.server_properties));
try!(writer.write_u32::<BigEndian>(self.mechanisms.len() as u32));
try!(writer.write_all(self.mechanisms.as_bytes()));
try!(writer.write_u32::<BigEndian>(self.locales.len() as u32));
try!(writer.write_all(self.locales.as_bytes()));
Ok(writer)
}
}
impl Start {
pub fn with_default_values(server_properties: Table) -> Start {
Start {
version_major: 0,
version_minor: 9,
mechanisms: "PLAIN".to_owned(),
locales: "en_US".to_owned(),
server_properties: server_properties,
}
}
}
#[derive(Debug)]
pub struct StartOk {
pub client_properties: Table,
pub mechanism: String,
pub response: String,
pub locale: String,
}
impl Method for StartOk {
fn name(&self) -> &'static str {
"connection.start-ok"
}
fn id(&self) -> u16 {
11
}
fn class_id(&self) -> u16 {
10
}
fn decode(method_frame: MethodFrame) -> AMQPResult<StartOk> {
match (method_frame.class_id, method_frame.method_id) {
(10, 11) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let client_properties = try!(decode_table(reader));
let mechanism = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
let response = {
let size = try!(reader.read_u32::<BigEndian>()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
let locale = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
Ok(StartOk {
client_properties: client_properties,
mechanism: mechanism,
response: response,
locale: locale,
})
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(encode_table(&mut writer, &self.client_properties));
try!(writer.write_u8(self.mechanism.len() as u8));
try!(writer.write_all(self.mechanism.as_bytes()));
try!(writer.write_u32::<BigEndian>(self.response.len() as u32));
try!(writer.write_all(self.response.as_bytes()));
try!(writer.write_u8(self.locale.len() as u8));
try!(writer.write_all(self.locale.as_bytes()));
Ok(writer)
}
}
impl StartOk {
pub fn with_default_values(client_properties: Table, response: String) -> StartOk {
StartOk {
mechanism: "PLAIN".to_owned(),
locale: "en_US".to_owned(),
client_properties: client_properties,
response: response,
}
}
}
#[derive(Debug)]
pub struct Secure {
pub challenge: String,
}
impl Method for Secure {
fn name(&self) -> &'static str {
"connection.secure"
}
fn id(&self) -> u16 {
20
}
fn class_id(&self) -> u16 {
10
}
fn decode(method_frame: MethodFrame) -> AMQPResult<Secure> {
match (method_frame.class_id, method_frame.method_id) {
(10, 20) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let challenge = {
let size = try!(reader.read_u32::<BigEndian>()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
Ok(Secure { challenge: challenge })
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(writer.write_u32::<BigEndian>(self.challenge.len() as u32));
try!(writer.write_all(self.challenge.as_bytes()));
Ok(writer)
}
}
#[derive(Debug)]
pub struct SecureOk {
pub response: String,
}
impl Method for SecureOk {
fn name(&self) -> &'static str {
"connection.secure-ok"
}
fn id(&self) -> u16 {
21
}
fn class_id(&self) -> u16 {
10
}
fn decode(method_frame: MethodFrame) -> AMQPResult<SecureOk> {
match (method_frame.class_id, method_frame.method_id) {
(10, 21) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let response = {
let size = try!(reader.read_u32::<BigEndian>()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
Ok(SecureOk { response: response })
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(writer.write_u32::<BigEndian>(self.response.len() as u32));
try!(writer.write_all(self.response.as_bytes()));
Ok(writer)
}
}
#[derive(Debug)]
pub struct Tune {
pub channel_max: u16,
pub frame_max: u32,
pub heartbeat: u16,
}
impl Method for Tune {
fn name(&self) -> &'static str {
"connection.tune"
}
fn id(&self) -> u16 {
30
}
fn class_id(&self) -> u16 {
10
}
fn decode(method_frame: MethodFrame) -> AMQPResult<Tune> {
match (method_frame.class_id, method_frame.method_id) {
(10, 30) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let channel_max = try!(reader.read_u16::<BigEndian>());
let frame_max = try!(reader.read_u32::<BigEndian>());
let heartbeat = try!(reader.read_u16::<BigEndian>());
Ok(Tune {
channel_max: channel_max,
frame_max: frame_max,
heartbeat: heartbeat,
})
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(writer.write_u16::<BigEndian>(self.channel_max));
try!(writer.write_u32::<BigEndian>(self.frame_max));
try!(writer.write_u16::<BigEndian>(self.heartbeat));
Ok(writer)
}
}
impl Tune {
pub fn with_default_values() -> Tune {
Tune {
channel_max: 0,
frame_max: 0,
heartbeat: 0,
}
}
}
#[derive(Debug)]
pub struct TuneOk {
pub channel_max: u16,
pub frame_max: u32,
pub heartbeat: u16,
}
impl Method for TuneOk {
fn name(&self) -> &'static str {
"connection.tune-ok"
}
fn id(&self) -> u16 {
31
}
fn class_id(&self) -> u16 {
10
}
fn decode(method_frame: MethodFrame) -> AMQPResult<TuneOk> {
match (method_frame.class_id, method_frame.method_id) {
(10, 31) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let channel_max = try!(reader.read_u16::<BigEndian>());
let frame_max = try!(reader.read_u32::<BigEndian>());
let heartbeat = try!(reader.read_u16::<BigEndian>());
Ok(TuneOk {
channel_max: channel_max,
frame_max: frame_max,
heartbeat: heartbeat,
})
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(writer.write_u16::<BigEndian>(self.channel_max));
try!(writer.write_u32::<BigEndian>(self.frame_max));
try!(writer.write_u16::<BigEndian>(self.heartbeat));
Ok(writer)
}
}
impl TuneOk {
pub fn with_default_values() -> TuneOk {
TuneOk {
channel_max: 0,
frame_max: 0,
heartbeat: 0,
}
}
}
#[derive(Debug)]
pub struct Open {
pub virtual_host: String,
pub capabilities: String,
pub insist: bool,
}
impl Method for Open {
fn name(&self) -> &'static str {
"connection.open"
}
fn id(&self) -> u16 {
40
}
fn class_id(&self) -> u16 {
10
}
fn decode(method_frame: MethodFrame) -> AMQPResult<Open> {
match (method_frame.class_id, method_frame.method_id) {
(10, 40) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let virtual_host = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
let capabilities = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
let byte = try!(reader.read_u8());
let bits = BitVec::from_bytes(&[byte]);
let insist = match bits.get(7) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
Ok(Open {
virtual_host: virtual_host,
capabilities: capabilities,
insist: insist,
})
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(writer.write_u8(self.virtual_host.len() as u8));
try!(writer.write_all(self.virtual_host.as_bytes()));
try!(writer.write_u8(self.capabilities.len() as u8));
try!(writer.write_all(self.capabilities.as_bytes()));
let mut bits = BitVec::from_elem(8, false);
bits.set(7, self.insist);
try!(writer.write_all(&bits.to_bytes()));
Ok(writer)
}
}
impl Open {
pub fn with_default_values(insist: bool) -> Open {
Open {
virtual_host: "/".to_owned(),
capabilities: "".to_owned(),
insist: insist,
}
}
}
#[derive(Debug)]
pub struct OpenOk {
pub known_hosts: String,
}
impl Method for OpenOk {
fn name(&self) -> &'static str {
"connection.open-ok"
}
fn id(&self) -> u16 {
41
}
fn class_id(&self) -> u16 {
10
}
fn decode(method_frame: MethodFrame) -> AMQPResult<OpenOk> {
match (method_frame.class_id, method_frame.method_id) {
(10, 41) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let known_hosts = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
Ok(OpenOk { known_hosts: known_hosts })
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(writer.write_u8(self.known_hosts.len() as u8));
try!(writer.write_all(self.known_hosts.as_bytes()));
Ok(writer)
}
}
impl OpenOk {
pub fn with_default_values() -> OpenOk {
OpenOk { known_hosts: "".to_owned() }
}
}
#[derive(Debug)]
pub struct Close {
pub reply_code: u16,
pub reply_text: String,
pub class_id: u16,
pub method_id: u16,
}
impl Method for Close {
fn name(&self) -> &'static str {
"connection.close"
}
fn id(&self) -> u16 {
50
}
fn class_id(&self) -> u16 {
10
}
fn decode(method_frame: MethodFrame) -> AMQPResult<Close> {
match (method_frame.class_id, method_frame.method_id) {
(10, 50) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let reply_code = try!(reader.read_u16::<BigEndian>());
let reply_text = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
let class_id = try!(reader.read_u16::<BigEndian>());
let method_id = try!(reader.read_u16::<BigEndian>());
Ok(Close {
reply_code: reply_code,
reply_text: reply_text,
class_id: class_id,
method_id: method_id,
})
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(writer.write_u16::<BigEndian>(self.reply_code));
try!(writer.write_u8(self.reply_text.len() as u8));
try!(writer.write_all(self.reply_text.as_bytes()));
try!(writer.write_u16::<BigEndian>(self.class_id));
try!(writer.write_u16::<BigEndian>(self.method_id));
Ok(writer)
}
}
impl Close {
pub fn with_default_values(reply_code: u16, class_id: u16, method_id: u16) -> Close {
Close {
reply_text: "".to_owned(),
reply_code: reply_code,
class_id: class_id,
method_id: method_id,
}
}
}
#[derive(Debug)]
pub struct CloseOk;
impl Method for CloseOk {
fn name(&self) -> &'static str {
"connection.close-ok"
}
fn id(&self) -> u16 {
51
}
fn class_id(&self) -> u16 {
10
}
fn decode(method_frame: MethodFrame) -> AMQPResult<CloseOk> {
match (method_frame.class_id, method_frame.method_id) {
(10, 51) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
Ok(CloseOk)
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
Ok(vec![])
}
}
#[derive(Debug)]
pub struct Blocked {
pub reason: String,
}
impl Method for Blocked {
fn name(&self) -> &'static str {
"connection.blocked"
}
fn id(&self) -> u16 {
60
}
fn class_id(&self) -> u16 {
10
}
fn decode(method_frame: MethodFrame) -> AMQPResult<Blocked> {
match (method_frame.class_id, method_frame.method_id) {
(10, 60) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let reason = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
Ok(Blocked { reason: reason })
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(writer.write_u8(self.reason.len() as u8));
try!(writer.write_all(self.reason.as_bytes()));
Ok(writer)
}
}
impl Blocked {
pub fn with_default_values() -> Blocked {
Blocked { reason: "".to_owned() }
}
}
#[derive(Debug)]
pub struct Unblocked;
impl Method for Unblocked {
fn name(&self) -> &'static str {
"connection.unblocked"
}
fn id(&self) -> u16 {
61
}
fn class_id(&self) -> u16 {
10
}
fn decode(method_frame: MethodFrame) -> AMQPResult<Unblocked> {
match (method_frame.class_id, method_frame.method_id) {
(10, 61) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
Ok(Unblocked)
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
Ok(vec![])
}
}
}
#[allow(unused_imports)]
#[allow(missing_copy_implementations)]
pub mod channel {
use bit_vec::BitVec;
use table;
use table::{Table, decode_table, encode_table};
use protocol::{Method, MethodFrame};
use framing::ContentHeaderFrame;
use amqp_error::{AMQPResult, AMQPError};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use std::io::{Read, Write};
#[derive(Debug)]
pub struct Open {
pub out_of_band: String,
}
impl Method for Open {
fn name(&self) -> &'static str {
"channel.open"
}
fn id(&self) -> u16 {
10
}
fn class_id(&self) -> u16 {
20
}
fn decode(method_frame: MethodFrame) -> AMQPResult<Open> {
match (method_frame.class_id, method_frame.method_id) {
(20, 10) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let out_of_band = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
Ok(Open { out_of_band: out_of_band })
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(writer.write_u8(self.out_of_band.len() as u8));
try!(writer.write_all(self.out_of_band.as_bytes()));
Ok(writer)
}
}
impl Open {
pub fn with_default_values() -> Open {
Open { out_of_band: "".to_owned() }
}
}
#[derive(Debug)]
pub struct OpenOk {
pub channel_id: String,
}
impl Method for OpenOk {
fn name(&self) -> &'static str {
"channel.open-ok"
}
fn id(&self) -> u16 {
11
}
fn class_id(&self) -> u16 {
20
}
fn decode(method_frame: MethodFrame) -> AMQPResult<OpenOk> {
match (method_frame.class_id, method_frame.method_id) {
(20, 11) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let channel_id = {
let size = try!(reader.read_u32::<BigEndian>()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
Ok(OpenOk { channel_id: channel_id })
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(writer.write_u32::<BigEndian>(self.channel_id.len() as u32));
try!(writer.write_all(self.channel_id.as_bytes()));
Ok(writer)
}
}
impl OpenOk {
pub fn with_default_values() -> OpenOk {
OpenOk { channel_id: "".to_owned() }
}
}
#[derive(Debug)]
pub struct Flow {
pub active: bool,
}
impl Method for Flow {
fn name(&self) -> &'static str {
"channel.flow"
}
fn id(&self) -> u16 {
20
}
fn class_id(&self) -> u16 {
20
}
fn decode(method_frame: MethodFrame) -> AMQPResult<Flow> {
match (method_frame.class_id, method_frame.method_id) {
(20, 20) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let byte = try!(reader.read_u8());
let bits = BitVec::from_bytes(&[byte]);
let active = match bits.get(7) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
Ok(Flow { active: active })
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
let mut bits = BitVec::from_elem(8, false);
bits.set(7, self.active);
try!(writer.write_all(&bits.to_bytes()));
Ok(writer)
}
}
#[derive(Debug)]
pub struct FlowOk {
pub active: bool,
}
impl Method for FlowOk {
fn name(&self) -> &'static str {
"channel.flow-ok"
}
fn id(&self) -> u16 {
21
}
fn class_id(&self) -> u16 {
20
}
fn decode(method_frame: MethodFrame) -> AMQPResult<FlowOk> {
match (method_frame.class_id, method_frame.method_id) {
(20, 21) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let byte = try!(reader.read_u8());
let bits = BitVec::from_bytes(&[byte]);
let active = match bits.get(7) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
Ok(FlowOk { active: active })
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
let mut bits = BitVec::from_elem(8, false);
bits.set(7, self.active);
try!(writer.write_all(&bits.to_bytes()));
Ok(writer)
}
}
#[derive(Debug)]
pub struct Close {
pub reply_code: u16,
pub reply_text: String,
pub class_id: u16,
pub method_id: u16,
}
impl Method for Close {
fn name(&self) -> &'static str {
"channel.close"
}
fn id(&self) -> u16 {
40
}
fn class_id(&self) -> u16 {
20
}
fn decode(method_frame: MethodFrame) -> AMQPResult<Close> {
match (method_frame.class_id, method_frame.method_id) {
(20, 40) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let reply_code = try!(reader.read_u16::<BigEndian>());
let reply_text = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
let class_id = try!(reader.read_u16::<BigEndian>());
let method_id = try!(reader.read_u16::<BigEndian>());
Ok(Close {
reply_code: reply_code,
reply_text: reply_text,
class_id: class_id,
method_id: method_id,
})
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(writer.write_u16::<BigEndian>(self.reply_code));
try!(writer.write_u8(self.reply_text.len() as u8));
try!(writer.write_all(self.reply_text.as_bytes()));
try!(writer.write_u16::<BigEndian>(self.class_id));
try!(writer.write_u16::<BigEndian>(self.method_id));
Ok(writer)
}
}
impl Close {
pub fn with_default_values(reply_code: u16, class_id: u16, method_id: u16) -> Close {
Close {
reply_text: "".to_owned(),
reply_code: reply_code,
class_id: class_id,
method_id: method_id,
}
}
}
#[derive(Debug)]
pub struct CloseOk;
impl Method for CloseOk {
fn name(&self) -> &'static str {
"channel.close-ok"
}
fn id(&self) -> u16 {
41
}
fn class_id(&self) -> u16 {
20
}
fn decode(method_frame: MethodFrame) -> AMQPResult<CloseOk> {
match (method_frame.class_id, method_frame.method_id) {
(20, 41) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
Ok(CloseOk)
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
Ok(vec![])
}
}
}
#[allow(unused_imports)]
#[allow(missing_copy_implementations)]
pub mod access {
use bit_vec::BitVec;
use table;
use table::{Table, decode_table, encode_table};
use protocol::{Method, MethodFrame};
use framing::ContentHeaderFrame;
use amqp_error::{AMQPResult, AMQPError};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use std::io::{Read, Write};
#[derive(Debug)]
pub struct Request {
pub realm: String,
pub exclusive: bool,
pub passive: bool,
pub active: bool,
pub write: bool,
pub read: bool,
}
impl Method for Request {
fn name(&self) -> &'static str {
"access.request"
}
fn id(&self) -> u16 {
10
}
fn class_id(&self) -> u16 {
30
}
fn decode(method_frame: MethodFrame) -> AMQPResult<Request> {
match (method_frame.class_id, method_frame.method_id) {
(30, 10) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let realm = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
let byte = try!(reader.read_u8());
let bits = BitVec::from_bytes(&[byte]);
let exclusive = match bits.get(7) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
let passive = match bits.get(6) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
let active = match bits.get(5) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
let write = match bits.get(4) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
let read = match bits.get(3) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
Ok(Request {
realm: realm,
exclusive: exclusive,
passive: passive,
active: active,
write: write,
read: read,
})
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(writer.write_u8(self.realm.len() as u8));
try!(writer.write_all(self.realm.as_bytes()));
let mut bits = BitVec::from_elem(8, false);
bits.set(7, self.exclusive);
bits.set(6, self.passive);
bits.set(5, self.active);
bits.set(4, self.write);
bits.set(3, self.read);
try!(writer.write_all(&bits.to_bytes()));
Ok(writer)
}
}
impl Request {
pub fn with_default_values(exclusive: bool) -> Request {
Request {
realm: "/data".to_owned(),
passive: true,
active: true,
write: true,
read: true,
exclusive: exclusive,
}
}
}
#[derive(Debug)]
pub struct RequestOk {
pub ticket: u16,
}
impl Method for RequestOk {
fn name(&self) -> &'static str {
"access.request-ok"
}
fn id(&self) -> u16 {
11
}
fn class_id(&self) -> u16 {
30
}
fn decode(method_frame: MethodFrame) -> AMQPResult<RequestOk> {
match (method_frame.class_id, method_frame.method_id) {
(30, 11) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let ticket = try!(reader.read_u16::<BigEndian>());
Ok(RequestOk { ticket: ticket })
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(writer.write_u16::<BigEndian>(self.ticket));
Ok(writer)
}
}
impl RequestOk {
pub fn with_default_values() -> RequestOk {
RequestOk { ticket: 1 }
}
}
}
#[allow(unused_imports)]
#[allow(missing_copy_implementations)]
pub mod exchange {
use bit_vec::BitVec;
use table;
use table::{Table, decode_table, encode_table};
use protocol::{Method, MethodFrame};
use framing::ContentHeaderFrame;
use amqp_error::{AMQPResult, AMQPError};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use std::io::{Read, Write};
#[derive(Debug)]
pub struct Declare {
pub ticket: u16,
pub exchange: String,
pub _type: String,
pub passive: bool,
pub durable: bool,
pub auto_delete: bool,
pub internal: bool,
pub nowait: bool,
pub arguments: Table,
}
impl Method for Declare {
fn name(&self) -> &'static str {
"exchange.declare"
}
fn id(&self) -> u16 {
10
}
fn class_id(&self) -> u16 {
40
}
fn decode(method_frame: MethodFrame) -> AMQPResult<Declare> {
match (method_frame.class_id, method_frame.method_id) {
(40, 10) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let ticket = try!(reader.read_u16::<BigEndian>());
let exchange = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
let _type = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
let byte = try!(reader.read_u8());
let bits = BitVec::from_bytes(&[byte]);
let passive = match bits.get(7) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
let durable = match bits.get(6) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
let auto_delete = match bits.get(5) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
let internal = match bits.get(4) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
let nowait = match bits.get(3) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
let arguments = try!(decode_table(reader));
Ok(Declare {
ticket: ticket,
exchange: exchange,
_type: _type,
passive: passive,
durable: durable,
auto_delete: auto_delete,
internal: internal,
nowait: nowait,
arguments: arguments,
})
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(writer.write_u16::<BigEndian>(self.ticket));
try!(writer.write_u8(self.exchange.len() as u8));
try!(writer.write_all(self.exchange.as_bytes()));
try!(writer.write_u8(self._type.len() as u8));
try!(writer.write_all(self._type.as_bytes()));
let mut bits = BitVec::from_elem(8, false);
bits.set(7, self.passive);
bits.set(6, self.durable);
bits.set(5, self.auto_delete);
bits.set(4, self.internal);
bits.set(3, self.nowait);
try!(writer.write_all(&bits.to_bytes()));
try!(encode_table(&mut writer, &self.arguments));
Ok(writer)
}
}
impl Declare {
pub fn with_default_values(exchange: String,
passive: bool,
durable: bool,
auto_delete: bool,
internal: bool,
nowait: bool)
-> Declare {
Declare {
ticket: 0,
_type: "direct".to_owned(),
arguments: Table::new(),
exchange: exchange,
passive: passive,
durable: durable,
auto_delete: auto_delete,
internal: internal,
nowait: nowait,
}
}
}
#[derive(Debug)]
pub struct DeclareOk;
impl Method for DeclareOk {
fn name(&self) -> &'static str {
"exchange.declare-ok"
}
fn id(&self) -> u16 {
11
}
fn class_id(&self) -> u16 {
40
}
fn decode(method_frame: MethodFrame) -> AMQPResult<DeclareOk> {
match (method_frame.class_id, method_frame.method_id) {
(40, 11) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
Ok(DeclareOk)
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
Ok(vec![])
}
}
#[derive(Debug)]
pub struct Delete {
pub ticket: u16,
pub exchange: String,
pub if_unused: bool,
pub nowait: bool,
}
impl Method for Delete {
fn name(&self) -> &'static str {
"exchange.delete"
}
fn id(&self) -> u16 {
20
}
fn class_id(&self) -> u16 {
40
}
fn decode(method_frame: MethodFrame) -> AMQPResult<Delete> {
match (method_frame.class_id, method_frame.method_id) {
(40, 20) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let ticket = try!(reader.read_u16::<BigEndian>());
let exchange = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
let byte = try!(reader.read_u8());
let bits = BitVec::from_bytes(&[byte]);
let if_unused = match bits.get(7) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
let nowait = match bits.get(6) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
Ok(Delete {
ticket: ticket,
exchange: exchange,
if_unused: if_unused,
nowait: nowait,
})
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(writer.write_u16::<BigEndian>(self.ticket));
try!(writer.write_u8(self.exchange.len() as u8));
try!(writer.write_all(self.exchange.as_bytes()));
let mut bits = BitVec::from_elem(8, false);
bits.set(7, self.if_unused);
bits.set(6, self.nowait);
try!(writer.write_all(&bits.to_bytes()));
Ok(writer)
}
}
impl Delete {
pub fn with_default_values(exchange: String, if_unused: bool, nowait: bool) -> Delete {
Delete {
ticket: 0,
exchange: exchange,
if_unused: if_unused,
nowait: nowait,
}
}
}
#[derive(Debug)]
pub struct DeleteOk;
impl Method for DeleteOk {
fn name(&self) -> &'static str {
"exchange.delete-ok"
}
fn id(&self) -> u16 {
21
}
fn class_id(&self) -> u16 {
40
}
fn decode(method_frame: MethodFrame) -> AMQPResult<DeleteOk> {
match (method_frame.class_id, method_frame.method_id) {
(40, 21) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
Ok(DeleteOk)
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
Ok(vec![])
}
}
#[derive(Debug)]
pub struct Bind {
pub ticket: u16,
pub destination: String,
pub source: String,
pub routing_key: String,
pub nowait: bool,
pub arguments: Table,
}
impl Method for Bind {
fn name(&self) -> &'static str {
"exchange.bind"
}
fn id(&self) -> u16 {
30
}
fn class_id(&self) -> u16 {
40
}
fn decode(method_frame: MethodFrame) -> AMQPResult<Bind> {
match (method_frame.class_id, method_frame.method_id) {
(40, 30) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let ticket = try!(reader.read_u16::<BigEndian>());
let destination = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
let source = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
let routing_key = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
let byte = try!(reader.read_u8());
let bits = BitVec::from_bytes(&[byte]);
let nowait = match bits.get(7) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
let arguments = try!(decode_table(reader));
Ok(Bind {
ticket: ticket,
destination: destination,
source: source,
routing_key: routing_key,
nowait: nowait,
arguments: arguments,
})
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(writer.write_u16::<BigEndian>(self.ticket));
try!(writer.write_u8(self.destination.len() as u8));
try!(writer.write_all(self.destination.as_bytes()));
try!(writer.write_u8(self.source.len() as u8));
try!(writer.write_all(self.source.as_bytes()));
try!(writer.write_u8(self.routing_key.len() as u8));
try!(writer.write_all(self.routing_key.as_bytes()));
let mut bits = BitVec::from_elem(8, false);
bits.set(7, self.nowait);
try!(writer.write_all(&bits.to_bytes()));
try!(encode_table(&mut writer, &self.arguments));
Ok(writer)
}
}
impl Bind {
pub fn with_default_values(destination: String, source: String, nowait: bool) -> Bind {
Bind {
ticket: 0,
routing_key: "".to_owned(),
arguments: Table::new(),
destination: destination,
source: source,
nowait: nowait,
}
}
}
#[derive(Debug)]
pub struct BindOk;
impl Method for BindOk {
fn name(&self) -> &'static str {
"exchange.bind-ok"
}
fn id(&self) -> u16 {
31
}
fn class_id(&self) -> u16 {
40
}
fn decode(method_frame: MethodFrame) -> AMQPResult<BindOk> {
match (method_frame.class_id, method_frame.method_id) {
(40, 31) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
Ok(BindOk)
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
Ok(vec![])
}
}
#[derive(Debug)]
pub struct Unbind {
pub ticket: u16,
pub destination: String,
pub source: String,
pub routing_key: String,
pub nowait: bool,
pub arguments: Table,
}
impl Method for Unbind {
fn name(&self) -> &'static str {
"exchange.unbind"
}
fn id(&self) -> u16 {
40
}
fn class_id(&self) -> u16 {
40
}
fn decode(method_frame: MethodFrame) -> AMQPResult<Unbind> {
match (method_frame.class_id, method_frame.method_id) {
(40, 40) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let ticket = try!(reader.read_u16::<BigEndian>());
let destination = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
let source = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
let routing_key = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
let byte = try!(reader.read_u8());
let bits = BitVec::from_bytes(&[byte]);
let nowait = match bits.get(7) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
let arguments = try!(decode_table(reader));
Ok(Unbind {
ticket: ticket,
destination: destination,
source: source,
routing_key: routing_key,
nowait: nowait,
arguments: arguments,
})
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(writer.write_u16::<BigEndian>(self.ticket));
try!(writer.write_u8(self.destination.len() as u8));
try!(writer.write_all(self.destination.as_bytes()));
try!(writer.write_u8(self.source.len() as u8));
try!(writer.write_all(self.source.as_bytes()));
try!(writer.write_u8(self.routing_key.len() as u8));
try!(writer.write_all(self.routing_key.as_bytes()));
let mut bits = BitVec::from_elem(8, false);
bits.set(7, self.nowait);
try!(writer.write_all(&bits.to_bytes()));
try!(encode_table(&mut writer, &self.arguments));
Ok(writer)
}
}
impl Unbind {
pub fn with_default_values(destination: String, source: String, nowait: bool) -> Unbind {
Unbind {
ticket: 0,
routing_key: "".to_owned(),
arguments: Table::new(),
destination: destination,
source: source,
nowait: nowait,
}
}
}
#[derive(Debug)]
pub struct UnbindOk;
impl Method for UnbindOk {
fn name(&self) -> &'static str {
"exchange.unbind-ok"
}
fn id(&self) -> u16 {
51
}
fn class_id(&self) -> u16 {
40
}
fn decode(method_frame: MethodFrame) -> AMQPResult<UnbindOk> {
match (method_frame.class_id, method_frame.method_id) {
(40, 51) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
Ok(UnbindOk)
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
Ok(vec![])
}
}
}
#[allow(unused_imports)]
#[allow(missing_copy_implementations)]
pub mod queue {
use bit_vec::BitVec;
use table;
use table::{Table, decode_table, encode_table};
use protocol::{Method, MethodFrame};
use framing::ContentHeaderFrame;
use amqp_error::{AMQPResult, AMQPError};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use std::io::{Read, Write};
#[derive(Debug)]
pub struct Declare {
pub ticket: u16,
pub queue: String,
pub passive: bool,
pub durable: bool,
pub exclusive: bool,
pub auto_delete: bool,
pub nowait: bool,
pub arguments: Table,
}
impl Method for Declare {
fn name(&self) -> &'static str {
"queue.declare"
}
fn id(&self) -> u16 {
10
}
fn class_id(&self) -> u16 {
50
}
fn decode(method_frame: MethodFrame) -> AMQPResult<Declare> {
match (method_frame.class_id, method_frame.method_id) {
(50, 10) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let ticket = try!(reader.read_u16::<BigEndian>());
let queue = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
let byte = try!(reader.read_u8());
let bits = BitVec::from_bytes(&[byte]);
let passive = match bits.get(7) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
let durable = match bits.get(6) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
let exclusive = match bits.get(5) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
let auto_delete = match bits.get(4) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
let nowait = match bits.get(3) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
let arguments = try!(decode_table(reader));
Ok(Declare {
ticket: ticket,
queue: queue,
passive: passive,
durable: durable,
exclusive: exclusive,
auto_delete: auto_delete,
nowait: nowait,
arguments: arguments,
})
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(writer.write_u16::<BigEndian>(self.ticket));
try!(writer.write_u8(self.queue.len() as u8));
try!(writer.write_all(self.queue.as_bytes()));
let mut bits = BitVec::from_elem(8, false);
bits.set(7, self.passive);
bits.set(6, self.durable);
bits.set(5, self.exclusive);
bits.set(4, self.auto_delete);
bits.set(3, self.nowait);
try!(writer.write_all(&bits.to_bytes()));
try!(encode_table(&mut writer, &self.arguments));
Ok(writer)
}
}
impl Declare {
pub fn with_default_values(passive: bool,
durable: bool,
exclusive: bool,
auto_delete: bool,
nowait: bool)
-> Declare {
Declare {
ticket: 0,
queue: "".to_owned(),
arguments: Table::new(),
passive: passive,
durable: durable,
exclusive: exclusive,
auto_delete: auto_delete,
nowait: nowait,
}
}
}
#[derive(Debug)]
pub struct DeclareOk {
pub queue: String,
pub message_count: u32,
pub consumer_count: u32,
}
impl Method for DeclareOk {
fn name(&self) -> &'static str {
"queue.declare-ok"
}
fn id(&self) -> u16 {
11
}
fn class_id(&self) -> u16 {
50
}
fn decode(method_frame: MethodFrame) -> AMQPResult<DeclareOk> {
match (method_frame.class_id, method_frame.method_id) {
(50, 11) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let queue = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
let message_count = try!(reader.read_u32::<BigEndian>());
let consumer_count = try!(reader.read_u32::<BigEndian>());
Ok(DeclareOk {
queue: queue,
message_count: message_count,
consumer_count: consumer_count,
})
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(writer.write_u8(self.queue.len() as u8));
try!(writer.write_all(self.queue.as_bytes()));
try!(writer.write_u32::<BigEndian>(self.message_count));
try!(writer.write_u32::<BigEndian>(self.consumer_count));
Ok(writer)
}
}
#[derive(Debug)]
pub struct Bind {
pub ticket: u16,
pub queue: String,
pub exchange: String,
pub routing_key: String,
pub nowait: bool,
pub arguments: Table,
}
impl Method for Bind {
fn name(&self) -> &'static str {
"queue.bind"
}
fn id(&self) -> u16 {
20
}
fn class_id(&self) -> u16 {
50
}
fn decode(method_frame: MethodFrame) -> AMQPResult<Bind> {
match (method_frame.class_id, method_frame.method_id) {
(50, 20) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let ticket = try!(reader.read_u16::<BigEndian>());
let queue = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
let exchange = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
let routing_key = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
let byte = try!(reader.read_u8());
let bits = BitVec::from_bytes(&[byte]);
let nowait = match bits.get(7) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
let arguments = try!(decode_table(reader));
Ok(Bind {
ticket: ticket,
queue: queue,
exchange: exchange,
routing_key: routing_key,
nowait: nowait,
arguments: arguments,
})
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(writer.write_u16::<BigEndian>(self.ticket));
try!(writer.write_u8(self.queue.len() as u8));
try!(writer.write_all(self.queue.as_bytes()));
try!(writer.write_u8(self.exchange.len() as u8));
try!(writer.write_all(self.exchange.as_bytes()));
try!(writer.write_u8(self.routing_key.len() as u8));
try!(writer.write_all(self.routing_key.as_bytes()));
let mut bits = BitVec::from_elem(8, false);
bits.set(7, self.nowait);
try!(writer.write_all(&bits.to_bytes()));
try!(encode_table(&mut writer, &self.arguments));
Ok(writer)
}
}
impl Bind {
pub fn with_default_values(exchange: String, nowait: bool) -> Bind {
Bind {
ticket: 0,
queue: "".to_owned(),
routing_key: "".to_owned(),
arguments: Table::new(),
exchange: exchange,
nowait: nowait,
}
}
}
#[derive(Debug)]
pub struct BindOk;
impl Method for BindOk {
fn name(&self) -> &'static str {
"queue.bind-ok"
}
fn id(&self) -> u16 {
21
}
fn class_id(&self) -> u16 {
50
}
fn decode(method_frame: MethodFrame) -> AMQPResult<BindOk> {
match (method_frame.class_id, method_frame.method_id) {
(50, 21) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
Ok(BindOk)
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
Ok(vec![])
}
}
#[derive(Debug)]
pub struct Purge {
pub ticket: u16,
pub queue: String,
pub nowait: bool,
}
impl Method for Purge {
fn name(&self) -> &'static str {
"queue.purge"
}
fn id(&self) -> u16 {
30
}
fn class_id(&self) -> u16 {
50
}
fn decode(method_frame: MethodFrame) -> AMQPResult<Purge> {
match (method_frame.class_id, method_frame.method_id) {
(50, 30) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let ticket = try!(reader.read_u16::<BigEndian>());
let queue = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
let byte = try!(reader.read_u8());
let bits = BitVec::from_bytes(&[byte]);
let nowait = match bits.get(7) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
Ok(Purge {
ticket: ticket,
queue: queue,
nowait: nowait,
})
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(writer.write_u16::<BigEndian>(self.ticket));
try!(writer.write_u8(self.queue.len() as u8));
try!(writer.write_all(self.queue.as_bytes()));
let mut bits = BitVec::from_elem(8, false);
bits.set(7, self.nowait);
try!(writer.write_all(&bits.to_bytes()));
Ok(writer)
}
}
impl Purge {
pub fn with_default_values(nowait: bool) -> Purge {
Purge {
ticket: 0,
queue: "".to_owned(),
nowait: nowait,
}
}
}
#[derive(Debug)]
pub struct PurgeOk {
pub message_count: u32,
}
impl Method for PurgeOk {
fn name(&self) -> &'static str {
"queue.purge-ok"
}
fn id(&self) -> u16 {
31
}
fn class_id(&self) -> u16 {
50
}
fn decode(method_frame: MethodFrame) -> AMQPResult<PurgeOk> {
match (method_frame.class_id, method_frame.method_id) {
(50, 31) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let message_count = try!(reader.read_u32::<BigEndian>());
Ok(PurgeOk { message_count: message_count })
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(writer.write_u32::<BigEndian>(self.message_count));
Ok(writer)
}
}
#[derive(Debug)]
pub struct Delete {
pub ticket: u16,
pub queue: String,
pub if_unused: bool,
pub if_empty: bool,
pub nowait: bool,
}
impl Method for Delete {
fn name(&self) -> &'static str {
"queue.delete"
}
fn id(&self) -> u16 {
40
}
fn class_id(&self) -> u16 {
50
}
fn decode(method_frame: MethodFrame) -> AMQPResult<Delete> {
match (method_frame.class_id, method_frame.method_id) {
(50, 40) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let ticket = try!(reader.read_u16::<BigEndian>());
let queue = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
let byte = try!(reader.read_u8());
let bits = BitVec::from_bytes(&[byte]);
let if_unused = match bits.get(7) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
let if_empty = match bits.get(6) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
let nowait = match bits.get(5) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
Ok(Delete {
ticket: ticket,
queue: queue,
if_unused: if_unused,
if_empty: if_empty,
nowait: nowait,
})
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(writer.write_u16::<BigEndian>(self.ticket));
try!(writer.write_u8(self.queue.len() as u8));
try!(writer.write_all(self.queue.as_bytes()));
let mut bits = BitVec::from_elem(8, false);
bits.set(7, self.if_unused);
bits.set(6, self.if_empty);
bits.set(5, self.nowait);
try!(writer.write_all(&bits.to_bytes()));
Ok(writer)
}
}
impl Delete {
pub fn with_default_values(if_unused: bool, if_empty: bool, nowait: bool) -> Delete {
Delete {
ticket: 0,
queue: "".to_owned(),
if_unused: if_unused,
if_empty: if_empty,
nowait: nowait,
}
}
}
#[derive(Debug)]
pub struct DeleteOk {
pub message_count: u32,
}
impl Method for DeleteOk {
fn name(&self) -> &'static str {
"queue.delete-ok"
}
fn id(&self) -> u16 {
41
}
fn class_id(&self) -> u16 {
50
}
fn decode(method_frame: MethodFrame) -> AMQPResult<DeleteOk> {
match (method_frame.class_id, method_frame.method_id) {
(50, 41) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let message_count = try!(reader.read_u32::<BigEndian>());
Ok(DeleteOk { message_count: message_count })
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(writer.write_u32::<BigEndian>(self.message_count));
Ok(writer)
}
}
#[derive(Debug)]
pub struct Unbind {
pub ticket: u16,
pub queue: String,
pub exchange: String,
pub routing_key: String,
pub arguments: Table,
}
impl Method for Unbind {
fn name(&self) -> &'static str {
"queue.unbind"
}
fn id(&self) -> u16 {
50
}
fn class_id(&self) -> u16 {
50
}
fn decode(method_frame: MethodFrame) -> AMQPResult<Unbind> {
match (method_frame.class_id, method_frame.method_id) {
(50, 50) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let ticket = try!(reader.read_u16::<BigEndian>());
let queue = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
let exchange = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
let routing_key = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
let arguments = try!(decode_table(reader));
Ok(Unbind {
ticket: ticket,
queue: queue,
exchange: exchange,
routing_key: routing_key,
arguments: arguments,
})
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(writer.write_u16::<BigEndian>(self.ticket));
try!(writer.write_u8(self.queue.len() as u8));
try!(writer.write_all(self.queue.as_bytes()));
try!(writer.write_u8(self.exchange.len() as u8));
try!(writer.write_all(self.exchange.as_bytes()));
try!(writer.write_u8(self.routing_key.len() as u8));
try!(writer.write_all(self.routing_key.as_bytes()));
try!(encode_table(&mut writer, &self.arguments));
Ok(writer)
}
}
impl Unbind {
pub fn with_default_values(exchange: String) -> Unbind {
Unbind {
ticket: 0,
queue: "".to_owned(),
routing_key: "".to_owned(),
arguments: Table::new(),
exchange: exchange,
}
}
}
#[derive(Debug)]
pub struct UnbindOk;
impl Method for UnbindOk {
fn name(&self) -> &'static str {
"queue.unbind-ok"
}
fn id(&self) -> u16 {
51
}
fn class_id(&self) -> u16 {
50
}
fn decode(method_frame: MethodFrame) -> AMQPResult<UnbindOk> {
match (method_frame.class_id, method_frame.method_id) {
(50, 51) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
Ok(UnbindOk)
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
Ok(vec![])
}
}
}
#[allow(unused_imports)]
#[allow(missing_copy_implementations)]
pub mod basic {
use bit_vec::BitVec;
use table;
use table::{Table, decode_table, encode_table};
use protocol::{Method, MethodFrame};
use framing::ContentHeaderFrame;
use amqp_error::{AMQPResult, AMQPError};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use std::io::{Read, Write};
#[derive(Debug, Default, Clone)]
pub struct BasicProperties {
pub content_type: Option<String>,
pub content_encoding: Option<String>,
pub headers: Option<Table>,
pub delivery_mode: Option<u8>,
pub priority: Option<u8>,
pub correlation_id: Option<String>,
pub reply_to: Option<String>,
pub expiration: Option<String>,
pub message_id: Option<String>,
pub timestamp: Option<u64>,
pub _type: Option<String>,
pub user_id: Option<String>,
pub app_id: Option<String>,
pub cluster_id: Option<String>,
}
impl BasicProperties {
pub fn decode(content_header_frame: ContentHeaderFrame) -> AMQPResult<BasicProperties> {
let reader = &mut &content_header_frame.properties[..];
let properties_flags =
BitVec::from_bytes(&[((content_header_frame.properties_flags >> 8) & 0xff) as u8,
(content_header_frame.properties_flags & 0xff) as u8]);
let content_type = match properties_flags.get(0) {
Some(flag) if flag => {
Some({
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
})
}
None => {
return Err(AMQPError::Protocol("Properties flags are not correct".to_owned()))
}
_ => None,
};
let content_encoding = match properties_flags.get(1) {
Some(flag) if flag => {
Some({
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
})
}
None => {
return Err(AMQPError::Protocol("Properties flags are not correct".to_owned()))
}
_ => None,
};
let headers = match properties_flags.get(2) {
Some(flag) if flag => Some(try!(decode_table(reader))),
None => {
return Err(AMQPError::Protocol("Properties flags are not correct".to_owned()))
}
_ => None,
};
let delivery_mode = match properties_flags.get(3) {
Some(flag) if flag => Some(try!(reader.read_u8())),
None => {
return Err(AMQPError::Protocol("Properties flags are not correct".to_owned()))
}
_ => None,
};
let priority = match properties_flags.get(4) {
Some(flag) if flag => Some(try!(reader.read_u8())),
None => {
return Err(AMQPError::Protocol("Properties flags are not correct".to_owned()))
}
_ => None,
};
let correlation_id = match properties_flags.get(5) {
Some(flag) if flag => {
Some({
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
})
}
None => {
return Err(AMQPError::Protocol("Properties flags are not correct".to_owned()))
}
_ => None,
};
let reply_to = match properties_flags.get(6) {
Some(flag) if flag => {
Some({
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
})
}
None => {
return Err(AMQPError::Protocol("Properties flags are not correct".to_owned()))
}
_ => None,
};
let expiration = match properties_flags.get(7) {
Some(flag) if flag => {
Some({
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
})
}
None => {
return Err(AMQPError::Protocol("Properties flags are not correct".to_owned()))
}
_ => None,
};
let message_id = match properties_flags.get(8) {
Some(flag) if flag => {
Some({
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
})
}
None => {
return Err(AMQPError::Protocol("Properties flags are not correct".to_owned()))
}
_ => None,
};
let timestamp = match properties_flags.get(9) {
Some(flag) if flag => Some(try!(reader.read_u64::<BigEndian>())),
None => {
return Err(AMQPError::Protocol("Properties flags are not correct".to_owned()))
}
_ => None,
};
let _type = match properties_flags.get(10) {
Some(flag) if flag => {
Some({
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
})
}
None => {
return Err(AMQPError::Protocol("Properties flags are not correct".to_owned()))
}
_ => None,
};
let user_id = match properties_flags.get(11) {
Some(flag) if flag => {
Some({
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
})
}
None => {
return Err(AMQPError::Protocol("Properties flags are not correct".to_owned()))
}
_ => None,
};
let app_id = match properties_flags.get(12) {
Some(flag) if flag => {
Some({
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
})
}
None => {
return Err(AMQPError::Protocol("Properties flags are not correct".to_owned()))
}
_ => None,
};
let cluster_id = match properties_flags.get(13) {
Some(flag) if flag => {
Some({
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
})
}
None => {
return Err(AMQPError::Protocol("Properties flags are not correct".to_owned()))
}
_ => None,
};
Ok(BasicProperties {
content_type: content_type,
content_encoding: content_encoding,
headers: headers,
delivery_mode: delivery_mode,
priority: priority,
correlation_id: correlation_id,
reply_to: reply_to,
expiration: expiration,
message_id: message_id,
timestamp: timestamp,
_type: _type,
user_id: user_id,
app_id: app_id,
cluster_id: cluster_id,
})
}
pub fn encode(self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
match self.content_type {
Some(prop) => {
let content_type = prop;
try!(writer.write_u8(content_type.len() as u8));
try!(writer.write_all(content_type.as_bytes()));
}
None => {}
};
match self.content_encoding {
Some(prop) => {
let content_encoding = prop;
try!(writer.write_u8(content_encoding.len() as u8));
try!(writer.write_all(content_encoding.as_bytes()));
}
None => {}
};
match self.headers {
Some(prop) => {
let headers = prop;
try!(encode_table(&mut writer, &headers));
}
None => {}
};
match self.delivery_mode {
Some(prop) => {
let delivery_mode = prop;
try!(writer.write_u8(delivery_mode));
}
None => {}
};
match self.priority {
Some(prop) => {
let priority = prop;
try!(writer.write_u8(priority));
}
None => {}
};
match self.correlation_id {
Some(prop) => {
let correlation_id = prop;
try!(writer.write_u8(correlation_id.len() as u8));
try!(writer.write_all(correlation_id.as_bytes()));
}
None => {}
};
match self.reply_to {
Some(prop) => {
let reply_to = prop;
try!(writer.write_u8(reply_to.len() as u8));
try!(writer.write_all(reply_to.as_bytes()));
}
None => {}
};
match self.expiration {
Some(prop) => {
let expiration = prop;
try!(writer.write_u8(expiration.len() as u8));
try!(writer.write_all(expiration.as_bytes()));
}
None => {}
};
match self.message_id {
Some(prop) => {
let message_id = prop;
try!(writer.write_u8(message_id.len() as u8));
try!(writer.write_all(message_id.as_bytes()));
}
None => {}
};
match self.timestamp {
Some(prop) => {
let timestamp = prop;
try!(writer.write_u64::<BigEndian>(timestamp));
}
None => {}
};
match self._type {
Some(prop) => {
let _type = prop;
try!(writer.write_u8(_type.len() as u8));
try!(writer.write_all(_type.as_bytes()));
}
None => {}
};
match self.user_id {
Some(prop) => {
let user_id = prop;
try!(writer.write_u8(user_id.len() as u8));
try!(writer.write_all(user_id.as_bytes()));
}
None => {}
};
match self.app_id {
Some(prop) => {
let app_id = prop;
try!(writer.write_u8(app_id.len() as u8));
try!(writer.write_all(app_id.as_bytes()));
}
None => {}
};
match self.cluster_id {
Some(prop) => {
let cluster_id = prop;
try!(writer.write_u8(cluster_id.len() as u8));
try!(writer.write_all(cluster_id.as_bytes()));
}
None => {}
};
Ok(writer)
}
pub fn flags(&self) -> u16 {
let mut bits = BitVec::from_elem(16, false);
bits.set(0, self.content_type.is_some());
bits.set(1, self.content_encoding.is_some());
bits.set(2, self.headers.is_some());
bits.set(3, self.delivery_mode.is_some());
bits.set(4, self.priority.is_some());
bits.set(5, self.correlation_id.is_some());
bits.set(6, self.reply_to.is_some());
bits.set(7, self.expiration.is_some());
bits.set(8, self.message_id.is_some());
bits.set(9, self.timestamp.is_some());
bits.set(10, self._type.is_some());
bits.set(11, self.user_id.is_some());
bits.set(12, self.app_id.is_some());
bits.set(13, self.cluster_id.is_some());
let flags: u16 = bits.to_bytes()[0] as u16;
(flags << 8 | bits.to_bytes()[1] as u16) as u16
}
}
#[derive(Debug)]
pub struct Qos {
pub prefetch_size: u32,
pub prefetch_count: u16,
pub global: bool,
}
impl Method for Qos {
fn name(&self) -> &'static str {
"basic.qos"
}
fn id(&self) -> u16 {
10
}
fn class_id(&self) -> u16 {
60
}
fn decode(method_frame: MethodFrame) -> AMQPResult<Qos> {
match (method_frame.class_id, method_frame.method_id) {
(60, 10) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let prefetch_size = try!(reader.read_u32::<BigEndian>());
let prefetch_count = try!(reader.read_u16::<BigEndian>());
let byte = try!(reader.read_u8());
let bits = BitVec::from_bytes(&[byte]);
let global = match bits.get(7) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
Ok(Qos {
prefetch_size: prefetch_size,
prefetch_count: prefetch_count,
global: global,
})
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(writer.write_u32::<BigEndian>(self.prefetch_size));
try!(writer.write_u16::<BigEndian>(self.prefetch_count));
let mut bits = BitVec::from_elem(8, false);
bits.set(7, self.global);
try!(writer.write_all(&bits.to_bytes()));
Ok(writer)
}
}
impl Qos {
pub fn with_default_values(global: bool) -> Qos {
Qos {
prefetch_size: 0,
prefetch_count: 0,
global: global,
}
}
}
#[derive(Debug)]
pub struct QosOk;
impl Method for QosOk {
fn name(&self) -> &'static str {
"basic.qos-ok"
}
fn id(&self) -> u16 {
11
}
fn class_id(&self) -> u16 {
60
}
fn decode(method_frame: MethodFrame) -> AMQPResult<QosOk> {
match (method_frame.class_id, method_frame.method_id) {
(60, 11) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
Ok(QosOk)
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
Ok(vec![])
}
}
#[derive(Debug)]
pub struct Consume {
pub ticket: u16,
pub queue: String,
pub consumer_tag: String,
pub no_local: bool,
pub no_ack: bool,
pub exclusive: bool,
pub nowait: bool,
pub arguments: Table,
}
impl Method for Consume {
fn name(&self) -> &'static str {
"basic.consume"
}
fn id(&self) -> u16 {
20
}
fn class_id(&self) -> u16 {
60
}
fn decode(method_frame: MethodFrame) -> AMQPResult<Consume> {
match (method_frame.class_id, method_frame.method_id) {
(60, 20) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let ticket = try!(reader.read_u16::<BigEndian>());
let queue = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
let consumer_tag = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
let byte = try!(reader.read_u8());
let bits = BitVec::from_bytes(&[byte]);
let no_local = match bits.get(7) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
let no_ack = match bits.get(6) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
let exclusive = match bits.get(5) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
let nowait = match bits.get(4) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
let arguments = try!(decode_table(reader));
Ok(Consume {
ticket: ticket,
queue: queue,
consumer_tag: consumer_tag,
no_local: no_local,
no_ack: no_ack,
exclusive: exclusive,
nowait: nowait,
arguments: arguments,
})
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(writer.write_u16::<BigEndian>(self.ticket));
try!(writer.write_u8(self.queue.len() as u8));
try!(writer.write_all(self.queue.as_bytes()));
try!(writer.write_u8(self.consumer_tag.len() as u8));
try!(writer.write_all(self.consumer_tag.as_bytes()));
let mut bits = BitVec::from_elem(8, false);
bits.set(7, self.no_local);
bits.set(6, self.no_ack);
bits.set(5, self.exclusive);
bits.set(4, self.nowait);
try!(writer.write_all(&bits.to_bytes()));
try!(encode_table(&mut writer, &self.arguments));
Ok(writer)
}
}
impl Consume {
pub fn with_default_values(no_local: bool,
no_ack: bool,
exclusive: bool,
nowait: bool)
-> Consume {
Consume {
ticket: 0,
queue: "".to_owned(),
consumer_tag: "".to_owned(),
arguments: Table::new(),
no_local: no_local,
no_ack: no_ack,
exclusive: exclusive,
nowait: nowait,
}
}
}
#[derive(Debug)]
pub struct ConsumeOk {
pub consumer_tag: String,
}
impl Method for ConsumeOk {
fn name(&self) -> &'static str {
"basic.consume-ok"
}
fn id(&self) -> u16 {
21
}
fn class_id(&self) -> u16 {
60
}
fn decode(method_frame: MethodFrame) -> AMQPResult<ConsumeOk> {
match (method_frame.class_id, method_frame.method_id) {
(60, 21) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let consumer_tag = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
Ok(ConsumeOk { consumer_tag: consumer_tag })
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(writer.write_u8(self.consumer_tag.len() as u8));
try!(writer.write_all(self.consumer_tag.as_bytes()));
Ok(writer)
}
}
#[derive(Debug)]
pub struct Cancel {
pub consumer_tag: String,
pub nowait: bool,
}
impl Method for Cancel {
fn name(&self) -> &'static str {
"basic.cancel"
}
fn id(&self) -> u16 {
30
}
fn class_id(&self) -> u16 {
60
}
fn decode(method_frame: MethodFrame) -> AMQPResult<Cancel> {
match (method_frame.class_id, method_frame.method_id) {
(60, 30) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let consumer_tag = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
let byte = try!(reader.read_u8());
let bits = BitVec::from_bytes(&[byte]);
let nowait = match bits.get(7) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
Ok(Cancel {
consumer_tag: consumer_tag,
nowait: nowait,
})
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(writer.write_u8(self.consumer_tag.len() as u8));
try!(writer.write_all(self.consumer_tag.as_bytes()));
let mut bits = BitVec::from_elem(8, false);
bits.set(7, self.nowait);
try!(writer.write_all(&bits.to_bytes()));
Ok(writer)
}
}
#[derive(Debug)]
pub struct CancelOk {
pub consumer_tag: String,
}
impl Method for CancelOk {
fn name(&self) -> &'static str {
"basic.cancel-ok"
}
fn id(&self) -> u16 {
31
}
fn class_id(&self) -> u16 {
60
}
fn decode(method_frame: MethodFrame) -> AMQPResult<CancelOk> {
match (method_frame.class_id, method_frame.method_id) {
(60, 31) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let consumer_tag = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
Ok(CancelOk { consumer_tag: consumer_tag })
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(writer.write_u8(self.consumer_tag.len() as u8));
try!(writer.write_all(self.consumer_tag.as_bytes()));
Ok(writer)
}
}
#[derive(Debug)]
pub struct Publish {
pub ticket: u16,
pub exchange: String,
pub routing_key: String,
pub mandatory: bool,
pub immediate: bool,
}
impl Method for Publish {
fn name(&self) -> &'static str {
"basic.publish"
}
fn id(&self) -> u16 {
40
}
fn class_id(&self) -> u16 {
60
}
fn decode(method_frame: MethodFrame) -> AMQPResult<Publish> {
match (method_frame.class_id, method_frame.method_id) {
(60, 40) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let ticket = try!(reader.read_u16::<BigEndian>());
let exchange = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
let routing_key = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
let byte = try!(reader.read_u8());
let bits = BitVec::from_bytes(&[byte]);
let mandatory = match bits.get(7) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
let immediate = match bits.get(6) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
Ok(Publish {
ticket: ticket,
exchange: exchange,
routing_key: routing_key,
mandatory: mandatory,
immediate: immediate,
})
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(writer.write_u16::<BigEndian>(self.ticket));
try!(writer.write_u8(self.exchange.len() as u8));
try!(writer.write_all(self.exchange.as_bytes()));
try!(writer.write_u8(self.routing_key.len() as u8));
try!(writer.write_all(self.routing_key.as_bytes()));
let mut bits = BitVec::from_elem(8, false);
bits.set(7, self.mandatory);
bits.set(6, self.immediate);
try!(writer.write_all(&bits.to_bytes()));
Ok(writer)
}
}
impl Publish {
pub fn with_default_values(mandatory: bool, immediate: bool) -> Publish {
Publish {
ticket: 0,
exchange: "".to_owned(),
routing_key: "".to_owned(),
mandatory: mandatory,
immediate: immediate,
}
}
}
#[derive(Debug)]
pub struct Return {
pub reply_code: u16,
pub reply_text: String,
pub exchange: String,
pub routing_key: String,
}
impl Method for Return {
fn name(&self) -> &'static str {
"basic.return"
}
fn id(&self) -> u16 {
50
}
fn class_id(&self) -> u16 {
60
}
fn decode(method_frame: MethodFrame) -> AMQPResult<Return> {
match (method_frame.class_id, method_frame.method_id) {
(60, 50) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let reply_code = try!(reader.read_u16::<BigEndian>());
let reply_text = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
let exchange = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
let routing_key = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
Ok(Return {
reply_code: reply_code,
reply_text: reply_text,
exchange: exchange,
routing_key: routing_key,
})
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(writer.write_u16::<BigEndian>(self.reply_code));
try!(writer.write_u8(self.reply_text.len() as u8));
try!(writer.write_all(self.reply_text.as_bytes()));
try!(writer.write_u8(self.exchange.len() as u8));
try!(writer.write_all(self.exchange.as_bytes()));
try!(writer.write_u8(self.routing_key.len() as u8));
try!(writer.write_all(self.routing_key.as_bytes()));
Ok(writer)
}
}
impl Return {
pub fn with_default_values(reply_code: u16,
exchange: String,
routing_key: String)
-> Return {
Return {
reply_text: "".to_owned(),
reply_code: reply_code,
exchange: exchange,
routing_key: routing_key,
}
}
}
#[derive(Debug)]
pub struct Deliver {
pub consumer_tag: String,
pub delivery_tag: u64,
pub redelivered: bool,
pub exchange: String,
pub routing_key: String,
}
impl Method for Deliver {
fn name(&self) -> &'static str {
"basic.deliver"
}
fn id(&self) -> u16 {
60
}
fn class_id(&self) -> u16 {
60
}
fn decode(method_frame: MethodFrame) -> AMQPResult<Deliver> {
match (method_frame.class_id, method_frame.method_id) {
(60, 60) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let consumer_tag = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
let delivery_tag = try!(reader.read_u64::<BigEndian>());
let byte = try!(reader.read_u8());
let bits = BitVec::from_bytes(&[byte]);
let redelivered = match bits.get(7) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
let exchange = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
let routing_key = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
Ok(Deliver {
consumer_tag: consumer_tag,
delivery_tag: delivery_tag,
redelivered: redelivered,
exchange: exchange,
routing_key: routing_key,
})
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(writer.write_u8(self.consumer_tag.len() as u8));
try!(writer.write_all(self.consumer_tag.as_bytes()));
try!(writer.write_u64::<BigEndian>(self.delivery_tag));
let mut bits = BitVec::from_elem(8, false);
bits.set(7, self.redelivered);
try!(writer.write_all(&bits.to_bytes()));
try!(writer.write_u8(self.exchange.len() as u8));
try!(writer.write_all(self.exchange.as_bytes()));
try!(writer.write_u8(self.routing_key.len() as u8));
try!(writer.write_all(self.routing_key.as_bytes()));
Ok(writer)
}
}
#[derive(Debug)]
pub struct Get {
pub ticket: u16,
pub queue: String,
pub no_ack: bool,
}
impl Method for Get {
fn name(&self) -> &'static str {
"basic.get"
}
fn id(&self) -> u16 {
70
}
fn class_id(&self) -> u16 {
60
}
fn decode(method_frame: MethodFrame) -> AMQPResult<Get> {
match (method_frame.class_id, method_frame.method_id) {
(60, 70) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let ticket = try!(reader.read_u16::<BigEndian>());
let queue = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
let byte = try!(reader.read_u8());
let bits = BitVec::from_bytes(&[byte]);
let no_ack = match bits.get(7) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
Ok(Get {
ticket: ticket,
queue: queue,
no_ack: no_ack,
})
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(writer.write_u16::<BigEndian>(self.ticket));
try!(writer.write_u8(self.queue.len() as u8));
try!(writer.write_all(self.queue.as_bytes()));
let mut bits = BitVec::from_elem(8, false);
bits.set(7, self.no_ack);
try!(writer.write_all(&bits.to_bytes()));
Ok(writer)
}
}
impl Get {
pub fn with_default_values(no_ack: bool) -> Get {
Get {
ticket: 0,
queue: "".to_owned(),
no_ack: no_ack,
}
}
}
#[derive(Debug)]
pub struct GetOk {
pub delivery_tag: u64,
pub redelivered: bool,
pub exchange: String,
pub routing_key: String,
pub message_count: u32,
}
impl Method for GetOk {
fn name(&self) -> &'static str {
"basic.get-ok"
}
fn id(&self) -> u16 {
71
}
fn class_id(&self) -> u16 {
60
}
fn decode(method_frame: MethodFrame) -> AMQPResult<GetOk> {
match (method_frame.class_id, method_frame.method_id) {
(60, 71) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let delivery_tag = try!(reader.read_u64::<BigEndian>());
let byte = try!(reader.read_u8());
let bits = BitVec::from_bytes(&[byte]);
let redelivered = match bits.get(7) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
let exchange = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
let routing_key = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
let message_count = try!(reader.read_u32::<BigEndian>());
Ok(GetOk {
delivery_tag: delivery_tag,
redelivered: redelivered,
exchange: exchange,
routing_key: routing_key,
message_count: message_count,
})
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(writer.write_u64::<BigEndian>(self.delivery_tag));
let mut bits = BitVec::from_elem(8, false);
bits.set(7, self.redelivered);
try!(writer.write_all(&bits.to_bytes()));
try!(writer.write_u8(self.exchange.len() as u8));
try!(writer.write_all(self.exchange.as_bytes()));
try!(writer.write_u8(self.routing_key.len() as u8));
try!(writer.write_all(self.routing_key.as_bytes()));
try!(writer.write_u32::<BigEndian>(self.message_count));
Ok(writer)
}
}
#[derive(Debug)]
pub struct GetEmpty {
pub cluster_id: String,
}
impl Method for GetEmpty {
fn name(&self) -> &'static str {
"basic.get-empty"
}
fn id(&self) -> u16 {
72
}
fn class_id(&self) -> u16 {
60
}
fn decode(method_frame: MethodFrame) -> AMQPResult<GetEmpty> {
match (method_frame.class_id, method_frame.method_id) {
(60, 72) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let cluster_id = {
let size = try!(reader.read_u8()) as usize;
let mut buffer: Vec<u8> = vec![0u8; size];
try!(reader.read(&mut buffer[..]));
String::from_utf8_lossy(&buffer[..]).to_string()
};
Ok(GetEmpty { cluster_id: cluster_id })
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(writer.write_u8(self.cluster_id.len() as u8));
try!(writer.write_all(self.cluster_id.as_bytes()));
Ok(writer)
}
}
impl GetEmpty {
pub fn with_default_values() -> GetEmpty {
GetEmpty { cluster_id: "".to_owned() }
}
}
#[derive(Debug)]
pub struct Ack {
pub delivery_tag: u64,
pub multiple: bool,
}
impl Method for Ack {
fn name(&self) -> &'static str {
"basic.ack"
}
fn id(&self) -> u16 {
80
}
fn class_id(&self) -> u16 {
60
}
fn decode(method_frame: MethodFrame) -> AMQPResult<Ack> {
match (method_frame.class_id, method_frame.method_id) {
(60, 80) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let delivery_tag = try!(reader.read_u64::<BigEndian>());
let byte = try!(reader.read_u8());
let bits = BitVec::from_bytes(&[byte]);
let multiple = match bits.get(7) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
Ok(Ack {
delivery_tag: delivery_tag,
multiple: multiple,
})
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(writer.write_u64::<BigEndian>(self.delivery_tag));
let mut bits = BitVec::from_elem(8, false);
bits.set(7, self.multiple);
try!(writer.write_all(&bits.to_bytes()));
Ok(writer)
}
}
impl Ack {
pub fn with_default_values(multiple: bool) -> Ack {
Ack {
delivery_tag: 0,
multiple: multiple,
}
}
}
#[derive(Debug)]
pub struct Reject {
pub delivery_tag: u64,
pub requeue: bool,
}
impl Method for Reject {
fn name(&self) -> &'static str {
"basic.reject"
}
fn id(&self) -> u16 {
90
}
fn class_id(&self) -> u16 {
60
}
fn decode(method_frame: MethodFrame) -> AMQPResult<Reject> {
match (method_frame.class_id, method_frame.method_id) {
(60, 90) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let delivery_tag = try!(reader.read_u64::<BigEndian>());
let byte = try!(reader.read_u8());
let bits = BitVec::from_bytes(&[byte]);
let requeue = match bits.get(7) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
Ok(Reject {
delivery_tag: delivery_tag,
requeue: requeue,
})
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(writer.write_u64::<BigEndian>(self.delivery_tag));
let mut bits = BitVec::from_elem(8, false);
bits.set(7, self.requeue);
try!(writer.write_all(&bits.to_bytes()));
Ok(writer)
}
}
impl Reject {
pub fn with_default_values(delivery_tag: u64) -> Reject {
Reject {
requeue: true,
delivery_tag: delivery_tag,
}
}
}
#[derive(Debug)]
pub struct RecoverAsync {
pub requeue: bool,
}
impl Method for RecoverAsync {
fn name(&self) -> &'static str {
"basic.recover-async"
}
fn id(&self) -> u16 {
100
}
fn class_id(&self) -> u16 {
60
}
fn decode(method_frame: MethodFrame) -> AMQPResult<RecoverAsync> {
match (method_frame.class_id, method_frame.method_id) {
(60, 100) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let byte = try!(reader.read_u8());
let bits = BitVec::from_bytes(&[byte]);
let requeue = match bits.get(7) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
Ok(RecoverAsync { requeue: requeue })
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
let mut bits = BitVec::from_elem(8, false);
bits.set(7, self.requeue);
try!(writer.write_all(&bits.to_bytes()));
Ok(writer)
}
}
#[derive(Debug)]
pub struct Recover {
pub requeue: bool,
}
impl Method for Recover {
fn name(&self) -> &'static str {
"basic.recover"
}
fn id(&self) -> u16 {
110
}
fn class_id(&self) -> u16 {
60
}
fn decode(method_frame: MethodFrame) -> AMQPResult<Recover> {
match (method_frame.class_id, method_frame.method_id) {
(60, 110) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let byte = try!(reader.read_u8());
let bits = BitVec::from_bytes(&[byte]);
let requeue = match bits.get(7) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
Ok(Recover { requeue: requeue })
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
let mut bits = BitVec::from_elem(8, false);
bits.set(7, self.requeue);
try!(writer.write_all(&bits.to_bytes()));
Ok(writer)
}
}
#[derive(Debug)]
pub struct RecoverOk;
impl Method for RecoverOk {
fn name(&self) -> &'static str {
"basic.recover-ok"
}
fn id(&self) -> u16 {
111
}
fn class_id(&self) -> u16 {
60
}
fn decode(method_frame: MethodFrame) -> AMQPResult<RecoverOk> {
match (method_frame.class_id, method_frame.method_id) {
(60, 111) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
Ok(RecoverOk)
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
Ok(vec![])
}
}
#[derive(Debug)]
pub struct Nack {
pub delivery_tag: u64,
pub multiple: bool,
pub requeue: bool,
}
impl Method for Nack {
fn name(&self) -> &'static str {
"basic.nack"
}
fn id(&self) -> u16 {
120
}
fn class_id(&self) -> u16 {
60
}
fn decode(method_frame: MethodFrame) -> AMQPResult<Nack> {
match (method_frame.class_id, method_frame.method_id) {
(60, 120) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let delivery_tag = try!(reader.read_u64::<BigEndian>());
let byte = try!(reader.read_u8());
let bits = BitVec::from_bytes(&[byte]);
let multiple = match bits.get(7) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
let requeue = match bits.get(6) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
Ok(Nack {
delivery_tag: delivery_tag,
multiple: multiple,
requeue: requeue,
})
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
try!(writer.write_u64::<BigEndian>(self.delivery_tag));
let mut bits = BitVec::from_elem(8, false);
bits.set(7, self.multiple);
bits.set(6, self.requeue);
try!(writer.write_all(&bits.to_bytes()));
Ok(writer)
}
}
impl Nack {
pub fn with_default_values(multiple: bool) -> Nack {
Nack {
delivery_tag: 0,
requeue: true,
multiple: multiple,
}
}
}
}
#[allow(unused_imports)]
#[allow(missing_copy_implementations)]
pub mod tx {
use bit_vec::BitVec;
use table;
use table::{Table, decode_table, encode_table};
use protocol::{Method, MethodFrame};
use framing::ContentHeaderFrame;
use amqp_error::{AMQPResult, AMQPError};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use std::io::{Read, Write};
#[derive(Debug)]
pub struct Select;
impl Method for Select {
fn name(&self) -> &'static str {
"tx.select"
}
fn id(&self) -> u16 {
10
}
fn class_id(&self) -> u16 {
90
}
fn decode(method_frame: MethodFrame) -> AMQPResult<Select> {
match (method_frame.class_id, method_frame.method_id) {
(90, 10) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
Ok(Select)
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
Ok(vec![])
}
}
#[derive(Debug)]
pub struct SelectOk;
impl Method for SelectOk {
fn name(&self) -> &'static str {
"tx.select-ok"
}
fn id(&self) -> u16 {
11
}
fn class_id(&self) -> u16 {
90
}
fn decode(method_frame: MethodFrame) -> AMQPResult<SelectOk> {
match (method_frame.class_id, method_frame.method_id) {
(90, 11) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
Ok(SelectOk)
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
Ok(vec![])
}
}
#[derive(Debug)]
pub struct Commit;
impl Method for Commit {
fn name(&self) -> &'static str {
"tx.commit"
}
fn id(&self) -> u16 {
20
}
fn class_id(&self) -> u16 {
90
}
fn decode(method_frame: MethodFrame) -> AMQPResult<Commit> {
match (method_frame.class_id, method_frame.method_id) {
(90, 20) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
Ok(Commit)
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
Ok(vec![])
}
}
#[derive(Debug)]
pub struct CommitOk;
impl Method for CommitOk {
fn name(&self) -> &'static str {
"tx.commit-ok"
}
fn id(&self) -> u16 {
21
}
fn class_id(&self) -> u16 {
90
}
fn decode(method_frame: MethodFrame) -> AMQPResult<CommitOk> {
match (method_frame.class_id, method_frame.method_id) {
(90, 21) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
Ok(CommitOk)
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
Ok(vec![])
}
}
#[derive(Debug)]
pub struct Rollback;
impl Method for Rollback {
fn name(&self) -> &'static str {
"tx.rollback"
}
fn id(&self) -> u16 {
30
}
fn class_id(&self) -> u16 {
90
}
fn decode(method_frame: MethodFrame) -> AMQPResult<Rollback> {
match (method_frame.class_id, method_frame.method_id) {
(90, 30) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
Ok(Rollback)
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
Ok(vec![])
}
}
#[derive(Debug)]
pub struct RollbackOk;
impl Method for RollbackOk {
fn name(&self) -> &'static str {
"tx.rollback-ok"
}
fn id(&self) -> u16 {
31
}
fn class_id(&self) -> u16 {
90
}
fn decode(method_frame: MethodFrame) -> AMQPResult<RollbackOk> {
match (method_frame.class_id, method_frame.method_id) {
(90, 31) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
Ok(RollbackOk)
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
Ok(vec![])
}
}
}
#[allow(unused_imports)]
#[allow(missing_copy_implementations)]
pub mod confirm {
use bit_vec::BitVec;
use table;
use table::{Table, decode_table, encode_table};
use protocol::{Method, MethodFrame};
use framing::ContentHeaderFrame;
use amqp_error::{AMQPResult, AMQPError};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use std::io::{Read, Write};
#[derive(Debug)]
pub struct Select {
pub nowait: bool,
}
impl Method for Select {
fn name(&self) -> &'static str {
"confirm.select"
}
fn id(&self) -> u16 {
10
}
fn class_id(&self) -> u16 {
85
}
fn decode(method_frame: MethodFrame) -> AMQPResult<Select> {
match (method_frame.class_id, method_frame.method_id) {
(85, 10) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
let reader = &mut &method_frame.arguments[..];
let byte = try!(reader.read_u8());
let bits = BitVec::from_bytes(&[byte]);
let nowait = match bits.get(7) {
Some(bit) => bit,
None => return Err(AMQPError::Protocol("Bitmap is not correct".to_owned())),
};
Ok(Select { nowait: nowait })
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
let mut writer = vec![];
let mut bits = BitVec::from_elem(8, false);
bits.set(7, self.nowait);
try!(writer.write_all(&bits.to_bytes()));
Ok(writer)
}
}
#[derive(Debug)]
pub struct SelectOk;
impl Method for SelectOk {
fn name(&self) -> &'static str {
"confirm.select-ok"
}
fn id(&self) -> u16 {
11
}
fn class_id(&self) -> u16 {
85
}
fn decode(method_frame: MethodFrame) -> AMQPResult<SelectOk> {
match (method_frame.class_id, method_frame.method_id) {
(85, 11) => {}
(_, _) => {
return Err(AMQPError::DecodeError("Frame class_id & method_id didn't match"))
}
};
Ok(SelectOk)
}
fn encode(&self) -> AMQPResult<Vec<u8>> {
Ok(vec![])
}
}
}