pub mod metadata {
use super::*;
pub const NAME: &str = "AMQP";
pub const MAJOR_VERSION: ShortShortUInt = 0;
pub const MINOR_VERSION: ShortShortUInt = 9;
pub const REVISION: ShortShortUInt = 1;
pub const PORT: LongUInt = 5672;
pub const COPYRIGHT: &str = r#"Copyright (C) 2007-2024 Broadcom Inc. and its subsidiaries. All rights reserved.
Permission is hereby granted, free of charge, to any person
obtaining a copy of this file (the "Software"), to deal in the
Software without restriction, including without limitation the
rights to use, copy, modify, merge, publish, distribute,
sublicense, and/or sell copies of the Software, and to permit
persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
OTHER DEALINGS IN THE SOFTWARE.
Class information entered from amqp_xml0-8.pdf and domain types from amqp-xml-doc0-9.pdf
Updated for 0-9-1 by Tony Garnock-Jones
b3cb053f15e7b98808c0ccc67f23cb3e amqp_xml0-8.pdf
http://twiststandards.org/?option=com_docman&task=cat_view&gid=28&Itemid=90
8444db91e2949dbecfb2585e9eef6d64 amqp-xml-doc0-9.pdf
https://jira.amqp.org/confluence/download/attachments/720900/amqp-xml-doc0-9.pdf?version=1
"#;
}
pub mod constants {
use super::*;
pub const FRAME_METHOD: ShortShortUInt = 1;
pub const FRAME_HEADER: ShortShortUInt = 2;
pub const FRAME_BODY: ShortShortUInt = 3;
pub const FRAME_HEARTBEAT: ShortShortUInt = 8;
pub const FRAME_MIN_SIZE: LongUInt = 4096;
pub const FRAME_END: ShortShortUInt = 206;
pub const REPLY_SUCCESS: ShortUInt = 200;
}
#[derive(Clone, Debug, PartialEq)]
pub enum AMQPSoftError {
CONTENTTOOLARGE,
NOROUTE,
NOCONSUMERS,
ACCESSREFUSED,
NOTFOUND,
RESOURCELOCKED,
PRECONDITIONFAILED,
}
impl AMQPSoftError {
pub fn get_id(&self) -> Identifier {
match *self {
AMQPSoftError::CONTENTTOOLARGE => 311,
AMQPSoftError::NOROUTE => 312,
AMQPSoftError::NOCONSUMERS => 313,
AMQPSoftError::ACCESSREFUSED => 403,
AMQPSoftError::NOTFOUND => 404,
AMQPSoftError::RESOURCELOCKED => 405,
AMQPSoftError::PRECONDITIONFAILED => 406,
}
}
pub fn from_id(id: Identifier) -> Option<AMQPSoftError> {
match id {
311 => Some(AMQPSoftError::CONTENTTOOLARGE),
312 => Some(AMQPSoftError::NOROUTE),
313 => Some(AMQPSoftError::NOCONSUMERS),
403 => Some(AMQPSoftError::ACCESSREFUSED),
404 => Some(AMQPSoftError::NOTFOUND),
405 => Some(AMQPSoftError::RESOURCELOCKED),
406 => Some(AMQPSoftError::PRECONDITIONFAILED),
_ => None,
}
}
}
impl fmt::Display for AMQPSoftError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
AMQPSoftError::CONTENTTOOLARGE => write!(f, "CONTENT-TOO-LARGE"),
AMQPSoftError::NOROUTE => write!(f, "NO-ROUTE"),
AMQPSoftError::NOCONSUMERS => write!(f, "NO-CONSUMERS"),
AMQPSoftError::ACCESSREFUSED => write!(f, "ACCESS-REFUSED"),
AMQPSoftError::NOTFOUND => write!(f, "NOT-FOUND"),
AMQPSoftError::RESOURCELOCKED => write!(f, "RESOURCE-LOCKED"),
AMQPSoftError::PRECONDITIONFAILED => write!(f, "PRECONDITION-FAILED"),
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub enum AMQPHardError {
CONNECTIONFORCED,
INVALIDPATH,
FRAMEERROR,
SYNTAXERROR,
COMMANDINVALID,
CHANNELERROR,
UNEXPECTEDFRAME,
RESOURCEERROR,
NOTALLOWED,
NOTIMPLEMENTED,
INTERNALERROR,
}
impl AMQPHardError {
pub fn get_id(&self) -> Identifier {
match *self {
AMQPHardError::CONNECTIONFORCED => 320,
AMQPHardError::INVALIDPATH => 402,
AMQPHardError::FRAMEERROR => 501,
AMQPHardError::SYNTAXERROR => 502,
AMQPHardError::COMMANDINVALID => 503,
AMQPHardError::CHANNELERROR => 504,
AMQPHardError::UNEXPECTEDFRAME => 505,
AMQPHardError::RESOURCEERROR => 506,
AMQPHardError::NOTALLOWED => 530,
AMQPHardError::NOTIMPLEMENTED => 540,
AMQPHardError::INTERNALERROR => 541,
}
}
pub fn from_id(id: Identifier) -> Option<AMQPHardError> {
match id {
320 => Some(AMQPHardError::CONNECTIONFORCED),
402 => Some(AMQPHardError::INVALIDPATH),
501 => Some(AMQPHardError::FRAMEERROR),
502 => Some(AMQPHardError::SYNTAXERROR),
503 => Some(AMQPHardError::COMMANDINVALID),
504 => Some(AMQPHardError::CHANNELERROR),
505 => Some(AMQPHardError::UNEXPECTEDFRAME),
506 => Some(AMQPHardError::RESOURCEERROR),
530 => Some(AMQPHardError::NOTALLOWED),
540 => Some(AMQPHardError::NOTIMPLEMENTED),
541 => Some(AMQPHardError::INTERNALERROR),
_ => None,
}
}
}
impl fmt::Display for AMQPHardError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
AMQPHardError::CONNECTIONFORCED => write!(f, "CONNECTION-FORCED"),
AMQPHardError::INVALIDPATH => write!(f, "INVALID-PATH"),
AMQPHardError::FRAMEERROR => write!(f, "FRAME-ERROR"),
AMQPHardError::SYNTAXERROR => write!(f, "SYNTAX-ERROR"),
AMQPHardError::COMMANDINVALID => write!(f, "COMMAND-INVALID"),
AMQPHardError::CHANNELERROR => write!(f, "CHANNEL-ERROR"),
AMQPHardError::UNEXPECTEDFRAME => write!(f, "UNEXPECTED-FRAME"),
AMQPHardError::RESOURCEERROR => write!(f, "RESOURCE-ERROR"),
AMQPHardError::NOTALLOWED => write!(f, "NOT-ALLOWED"),
AMQPHardError::NOTIMPLEMENTED => write!(f, "NOT-IMPLEMENTED"),
AMQPHardError::INTERNALERROR => write!(f, "INTERNAL-ERROR"),
}
}
}
use self::access::parse_access;
use self::basic::parse_basic;
use self::channel::parse_channel;
use self::confirm::parse_confirm;
use self::connection::parse_connection;
use self::exchange::parse_exchange;
use self::queue::parse_queue;
use self::tx::parse_tx;
pub fn parse_class<I: ParsableInput>(i: I) -> ParserResult<I, AMQPClass> {
context(
"parse_class",
map_opt(
flat_map(parse_id, |id| {
move |i| match id {
60 => map(map(parse_basic, AMQPClass::Basic), Some)(i),
10 => map(map(parse_connection, AMQPClass::Connection), Some)(i),
20 => map(map(parse_channel, AMQPClass::Channel), Some)(i),
30 => map(map(parse_access, AMQPClass::Access), Some)(i),
40 => map(map(parse_exchange, AMQPClass::Exchange), Some)(i),
50 => map(map(parse_queue, AMQPClass::Queue), Some)(i),
90 => map(map(parse_tx, AMQPClass::Tx), Some)(i),
85 => map(map(parse_confirm, AMQPClass::Confirm), Some)(i),
_ => Ok((i, None)),
}
}),
std::convert::identity,
),
)(i)
}
pub fn gen_class<'a, W: Write + BackToTheBuffer + 'a>(
class: &'a AMQPClass,
) -> impl SerializeFn<W> + 'a {
move |input| match *class {
AMQPClass::Basic(ref basic) => basic::gen_basic(basic)(input),
AMQPClass::Connection(ref connection) => connection::gen_connection(connection)(input),
AMQPClass::Channel(ref channel) => channel::gen_channel(channel)(input),
AMQPClass::Access(ref access) => access::gen_access(access)(input),
AMQPClass::Exchange(ref exchange) => exchange::gen_exchange(exchange)(input),
AMQPClass::Queue(ref queue) => queue::gen_queue(queue)(input),
AMQPClass::Tx(ref tx) => tx::gen_tx(tx)(input),
AMQPClass::Confirm(ref confirm) => confirm::gen_confirm(confirm)(input),
}
}
#[derive(Clone, Debug, PartialEq)]
pub enum AMQPClass {
Basic(basic::AMQPMethod),
Connection(connection::AMQPMethod),
Channel(channel::AMQPMethod),
Access(access::AMQPMethod),
Exchange(exchange::AMQPMethod),
Queue(queue::AMQPMethod),
Tx(tx::AMQPMethod),
Confirm(confirm::AMQPMethod),
}
impl AMQPClass {
pub fn get_amqp_class_id(&self) -> Identifier {
match self {
AMQPClass::Basic(_) => 60,
AMQPClass::Connection(_) => 10,
AMQPClass::Channel(_) => 20,
AMQPClass::Access(_) => 30,
AMQPClass::Exchange(_) => 40,
AMQPClass::Queue(_) => 50,
AMQPClass::Tx(_) => 90,
AMQPClass::Confirm(_) => 85,
}
}
pub fn get_amqp_method_id(&self) -> Identifier {
match self {
AMQPClass::Basic(basic::AMQPMethod::Qos(_)) => 10,
AMQPClass::Basic(basic::AMQPMethod::QosOk(_)) => 11,
AMQPClass::Basic(basic::AMQPMethod::Consume(_)) => 20,
AMQPClass::Basic(basic::AMQPMethod::ConsumeOk(_)) => 21,
AMQPClass::Basic(basic::AMQPMethod::Cancel(_)) => 30,
AMQPClass::Basic(basic::AMQPMethod::CancelOk(_)) => 31,
AMQPClass::Basic(basic::AMQPMethod::Publish(_)) => 40,
AMQPClass::Basic(basic::AMQPMethod::Return(_)) => 50,
AMQPClass::Basic(basic::AMQPMethod::Deliver(_)) => 60,
AMQPClass::Basic(basic::AMQPMethod::Get(_)) => 70,
AMQPClass::Basic(basic::AMQPMethod::GetOk(_)) => 71,
AMQPClass::Basic(basic::AMQPMethod::GetEmpty(_)) => 72,
AMQPClass::Basic(basic::AMQPMethod::Ack(_)) => 80,
AMQPClass::Basic(basic::AMQPMethod::Reject(_)) => 90,
AMQPClass::Basic(basic::AMQPMethod::RecoverAsync(_)) => 100,
AMQPClass::Basic(basic::AMQPMethod::Recover(_)) => 110,
AMQPClass::Basic(basic::AMQPMethod::RecoverOk(_)) => 111,
AMQPClass::Basic(basic::AMQPMethod::Nack(_)) => 120,
AMQPClass::Connection(connection::AMQPMethod::Start(_)) => 10,
AMQPClass::Connection(connection::AMQPMethod::StartOk(_)) => 11,
AMQPClass::Connection(connection::AMQPMethod::Secure(_)) => 20,
AMQPClass::Connection(connection::AMQPMethod::SecureOk(_)) => 21,
AMQPClass::Connection(connection::AMQPMethod::Tune(_)) => 30,
AMQPClass::Connection(connection::AMQPMethod::TuneOk(_)) => 31,
AMQPClass::Connection(connection::AMQPMethod::Open(_)) => 40,
AMQPClass::Connection(connection::AMQPMethod::OpenOk(_)) => 41,
AMQPClass::Connection(connection::AMQPMethod::Close(_)) => 50,
AMQPClass::Connection(connection::AMQPMethod::CloseOk(_)) => 51,
AMQPClass::Connection(connection::AMQPMethod::Blocked(_)) => 60,
AMQPClass::Connection(connection::AMQPMethod::Unblocked(_)) => 61,
AMQPClass::Connection(connection::AMQPMethod::UpdateSecret(_)) => 70,
AMQPClass::Connection(connection::AMQPMethod::UpdateSecretOk(_)) => 71,
AMQPClass::Channel(channel::AMQPMethod::Open(_)) => 10,
AMQPClass::Channel(channel::AMQPMethod::OpenOk(_)) => 11,
AMQPClass::Channel(channel::AMQPMethod::Flow(_)) => 20,
AMQPClass::Channel(channel::AMQPMethod::FlowOk(_)) => 21,
AMQPClass::Channel(channel::AMQPMethod::Close(_)) => 40,
AMQPClass::Channel(channel::AMQPMethod::CloseOk(_)) => 41,
AMQPClass::Access(access::AMQPMethod::Request(_)) => 10,
AMQPClass::Access(access::AMQPMethod::RequestOk(_)) => 11,
AMQPClass::Exchange(exchange::AMQPMethod::Declare(_)) => 10,
AMQPClass::Exchange(exchange::AMQPMethod::DeclareOk(_)) => 11,
AMQPClass::Exchange(exchange::AMQPMethod::Delete(_)) => 20,
AMQPClass::Exchange(exchange::AMQPMethod::DeleteOk(_)) => 21,
AMQPClass::Exchange(exchange::AMQPMethod::Bind(_)) => 30,
AMQPClass::Exchange(exchange::AMQPMethod::BindOk(_)) => 31,
AMQPClass::Exchange(exchange::AMQPMethod::Unbind(_)) => 40,
AMQPClass::Exchange(exchange::AMQPMethod::UnbindOk(_)) => 51,
AMQPClass::Queue(queue::AMQPMethod::Declare(_)) => 10,
AMQPClass::Queue(queue::AMQPMethod::DeclareOk(_)) => 11,
AMQPClass::Queue(queue::AMQPMethod::Bind(_)) => 20,
AMQPClass::Queue(queue::AMQPMethod::BindOk(_)) => 21,
AMQPClass::Queue(queue::AMQPMethod::Purge(_)) => 30,
AMQPClass::Queue(queue::AMQPMethod::PurgeOk(_)) => 31,
AMQPClass::Queue(queue::AMQPMethod::Delete(_)) => 40,
AMQPClass::Queue(queue::AMQPMethod::DeleteOk(_)) => 41,
AMQPClass::Queue(queue::AMQPMethod::Unbind(_)) => 50,
AMQPClass::Queue(queue::AMQPMethod::UnbindOk(_)) => 51,
AMQPClass::Tx(tx::AMQPMethod::Select(_)) => 10,
AMQPClass::Tx(tx::AMQPMethod::SelectOk(_)) => 11,
AMQPClass::Tx(tx::AMQPMethod::Commit(_)) => 20,
AMQPClass::Tx(tx::AMQPMethod::CommitOk(_)) => 21,
AMQPClass::Tx(tx::AMQPMethod::Rollback(_)) => 30,
AMQPClass::Tx(tx::AMQPMethod::RollbackOk(_)) => 31,
AMQPClass::Confirm(confirm::AMQPMethod::Select(_)) => 10,
AMQPClass::Confirm(confirm::AMQPMethod::SelectOk(_)) => 11,
}
}
}
pub mod basic {
use super::*;
pub fn parse_basic<I: ParsableInput>(i: I) -> ParserResult<I, basic::AMQPMethod> {
context(
"parse_basic",
map_opt(
flat_map(parse_id, |id| {
move |i| match id {
10 => context("parse_qos", map(map(parse_qos, AMQPMethod::Qos), Some))(i),
11 => context(
"parse_qos_ok",
map(map(parse_qos_ok, AMQPMethod::QosOk), Some),
)(i),
20 => context(
"parse_consume",
map(map(parse_consume, AMQPMethod::Consume), Some),
)(i),
21 => context(
"parse_consume_ok",
map(map(parse_consume_ok, AMQPMethod::ConsumeOk), Some),
)(i),
30 => context(
"parse_cancel",
map(map(parse_cancel, AMQPMethod::Cancel), Some),
)(i),
31 => context(
"parse_cancel_ok",
map(map(parse_cancel_ok, AMQPMethod::CancelOk), Some),
)(i),
40 => context(
"parse_publish",
map(map(parse_publish, AMQPMethod::Publish), Some),
)(i),
50 => context(
"parse_return",
map(map(parse_return, AMQPMethod::Return), Some),
)(i),
60 => context(
"parse_deliver",
map(map(parse_deliver, AMQPMethod::Deliver), Some),
)(i),
70 => context("parse_get", map(map(parse_get, AMQPMethod::Get), Some))(i),
71 => context(
"parse_get_ok",
map(map(parse_get_ok, AMQPMethod::GetOk), Some),
)(i),
72 => context(
"parse_get_empty",
map(map(parse_get_empty, AMQPMethod::GetEmpty), Some),
)(i),
80 => context("parse_ack", map(map(parse_ack, AMQPMethod::Ack), Some))(i),
90 => context(
"parse_reject",
map(map(parse_reject, AMQPMethod::Reject), Some),
)(i),
100 => context(
"parse_recover_async",
map(map(parse_recover_async, AMQPMethod::RecoverAsync), Some),
)(i),
110 => context(
"parse_recover",
map(map(parse_recover, AMQPMethod::Recover), Some),
)(i),
111 => context(
"parse_recover_ok",
map(map(parse_recover_ok, AMQPMethod::RecoverOk), Some),
)(i),
120 => {
context("parse_nack", map(map(parse_nack, AMQPMethod::Nack), Some))(i)
}
_ => Ok((i, None)),
}
}),
std::convert::identity,
),
)(i)
}
pub fn gen_basic<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a AMQPMethod,
) -> impl SerializeFn<W> + 'a {
cookie_factory::sequence::pair(gen_id(60), move |input| match *method {
AMQPMethod::Qos(ref qos) => gen_qos(qos)(input),
AMQPMethod::QosOk(ref qos_ok) => gen_qos_ok(qos_ok)(input),
AMQPMethod::Consume(ref consume) => gen_consume(consume)(input),
AMQPMethod::ConsumeOk(ref consume_ok) => gen_consume_ok(consume_ok)(input),
AMQPMethod::Cancel(ref cancel) => gen_cancel(cancel)(input),
AMQPMethod::CancelOk(ref cancel_ok) => gen_cancel_ok(cancel_ok)(input),
AMQPMethod::Publish(ref publish) => gen_publish(publish)(input),
AMQPMethod::Return(ref r#return) => gen_return(r#return)(input),
AMQPMethod::Deliver(ref deliver) => gen_deliver(deliver)(input),
AMQPMethod::Get(ref get) => gen_get(get)(input),
AMQPMethod::GetOk(ref get_ok) => gen_get_ok(get_ok)(input),
AMQPMethod::GetEmpty(ref get_empty) => gen_get_empty(get_empty)(input),
AMQPMethod::Ack(ref ack) => gen_ack(ack)(input),
AMQPMethod::Reject(ref reject) => gen_reject(reject)(input),
AMQPMethod::RecoverAsync(ref recover_async) => gen_recover_async(recover_async)(input),
AMQPMethod::Recover(ref recover) => gen_recover(recover)(input),
AMQPMethod::RecoverOk(ref recover_ok) => gen_recover_ok(recover_ok)(input),
AMQPMethod::Nack(ref nack) => gen_nack(nack)(input),
})
}
#[derive(Clone, Debug, PartialEq)]
pub enum AMQPMethod {
Qos(Qos),
QosOk(QosOk),
Consume(Consume),
ConsumeOk(ConsumeOk),
Cancel(Cancel),
CancelOk(CancelOk),
Publish(Publish),
Return(Return),
Deliver(Deliver),
Get(Get),
GetOk(GetOk),
GetEmpty(GetEmpty),
Ack(Ack),
Reject(Reject),
RecoverAsync(RecoverAsync),
Recover(Recover),
RecoverOk(RecoverOk),
Nack(Nack),
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct Qos {
pub prefetch_count: ShortUInt,
pub global: Boolean,
}
impl Qos {
pub fn get_amqp_class_id(&self) -> Identifier {
60
}
pub fn get_amqp_method_id(&self) -> Identifier {
10
}
}
pub fn parse_qos<I: ParsableInput>(i: I) -> ParserResult<I, Qos> {
let (i, _) = parse_long_uint(i)?;
let (i, prefetch_count) = parse_short_uint(i)?;
let (i, flags) = parse_flags(i, &["global"])?;
Ok((
i,
Qos {
prefetch_count,
global: flags.get_flag("global").unwrap_or(false),
},
))
}
pub fn gen_qos<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a Qos,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
let mut flags = AMQPFlags::default();
flags.add_flag("global".to_string(), method.global);
input = gen_id(10)(input)?;
input = gen_long_uint(0)(input)?;
input = gen_short_uint(method.prefetch_count)(input)?;
input = gen_flags(&flags)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct QosOk {}
impl QosOk {
pub fn get_amqp_class_id(&self) -> Identifier {
60
}
pub fn get_amqp_method_id(&self) -> Identifier {
11
}
}
pub fn parse_qos_ok<I: ParsableInput>(i: I) -> ParserResult<I, QosOk> {
Ok((i, QosOk {}))
}
pub fn gen_qos_ok<'a, W: Write + BackToTheBuffer + 'a>(
_: &'a QosOk,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
input = gen_id(11)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct Consume {
pub queue: ShortString,
pub consumer_tag: ShortString,
pub no_local: Boolean,
pub no_ack: Boolean,
pub exclusive: Boolean,
pub nowait: Boolean,
pub arguments: FieldTable,
}
impl Consume {
pub fn get_amqp_class_id(&self) -> Identifier {
60
}
pub fn get_amqp_method_id(&self) -> Identifier {
20
}
}
pub fn parse_consume<I: ParsableInput>(i: I) -> ParserResult<I, Consume> {
let (i, _) = parse_short_uint(i)?;
let (i, queue) = parse_short_string(i)?;
let (i, consumer_tag) = parse_short_string(i)?;
let (i, flags) = parse_flags(i, &["no-local", "no-ack", "exclusive", "nowait"])?;
let (i, arguments) = parse_field_table(i)?;
Ok((
i,
Consume {
queue,
consumer_tag,
no_local: flags.get_flag("no_local").unwrap_or(false),
no_ack: flags.get_flag("no_ack").unwrap_or(false),
exclusive: flags.get_flag("exclusive").unwrap_or(false),
nowait: flags.get_flag("nowait").unwrap_or(false),
arguments,
},
))
}
pub fn gen_consume<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a Consume,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
let mut flags = AMQPFlags::default();
flags.add_flag("no_local".to_string(), method.no_local);
flags.add_flag("no_ack".to_string(), method.no_ack);
flags.add_flag("exclusive".to_string(), method.exclusive);
flags.add_flag("nowait".to_string(), method.nowait);
input = gen_id(20)(input)?;
input = gen_short_uint(0)(input)?;
input = gen_short_string(method.queue.as_str())(input)?;
input = gen_short_string(method.consumer_tag.as_str())(input)?;
input = gen_flags(&flags)(input)?;
input = gen_field_table(&method.arguments)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct ConsumeOk {
pub consumer_tag: ShortString,
}
impl ConsumeOk {
pub fn get_amqp_class_id(&self) -> Identifier {
60
}
pub fn get_amqp_method_id(&self) -> Identifier {
21
}
}
pub fn parse_consume_ok<I: ParsableInput>(i: I) -> ParserResult<I, ConsumeOk> {
let (i, consumer_tag) = parse_short_string(i)?;
Ok((i, ConsumeOk { consumer_tag }))
}
pub fn gen_consume_ok<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a ConsumeOk,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
input = gen_id(21)(input)?;
input = gen_short_string(method.consumer_tag.as_str())(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct Cancel {
pub consumer_tag: ShortString,
pub nowait: Boolean,
}
impl Cancel {
pub fn get_amqp_class_id(&self) -> Identifier {
60
}
pub fn get_amqp_method_id(&self) -> Identifier {
30
}
}
pub fn parse_cancel<I: ParsableInput>(i: I) -> ParserResult<I, Cancel> {
let (i, consumer_tag) = parse_short_string(i)?;
let (i, flags) = parse_flags(i, &["nowait"])?;
Ok((
i,
Cancel {
consumer_tag,
nowait: flags.get_flag("nowait").unwrap_or(false),
},
))
}
pub fn gen_cancel<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a Cancel,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
let mut flags = AMQPFlags::default();
flags.add_flag("nowait".to_string(), method.nowait);
input = gen_id(30)(input)?;
input = gen_short_string(method.consumer_tag.as_str())(input)?;
input = gen_flags(&flags)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct CancelOk {
pub consumer_tag: ShortString,
}
impl CancelOk {
pub fn get_amqp_class_id(&self) -> Identifier {
60
}
pub fn get_amqp_method_id(&self) -> Identifier {
31
}
}
pub fn parse_cancel_ok<I: ParsableInput>(i: I) -> ParserResult<I, CancelOk> {
let (i, consumer_tag) = parse_short_string(i)?;
Ok((i, CancelOk { consumer_tag }))
}
pub fn gen_cancel_ok<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a CancelOk,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
input = gen_id(31)(input)?;
input = gen_short_string(method.consumer_tag.as_str())(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct Publish {
pub exchange: ShortString,
pub routing_key: ShortString,
pub mandatory: Boolean,
pub immediate: Boolean,
}
impl Publish {
pub fn get_amqp_class_id(&self) -> Identifier {
60
}
pub fn get_amqp_method_id(&self) -> Identifier {
40
}
}
pub fn parse_publish<I: ParsableInput>(i: I) -> ParserResult<I, Publish> {
let (i, _) = parse_short_uint(i)?;
let (i, exchange) = parse_short_string(i)?;
let (i, routing_key) = parse_short_string(i)?;
let (i, flags) = parse_flags(i, &["mandatory", "immediate"])?;
Ok((
i,
Publish {
exchange,
routing_key,
mandatory: flags.get_flag("mandatory").unwrap_or(false),
immediate: flags.get_flag("immediate").unwrap_or(false),
},
))
}
pub fn gen_publish<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a Publish,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
let mut flags = AMQPFlags::default();
flags.add_flag("mandatory".to_string(), method.mandatory);
flags.add_flag("immediate".to_string(), method.immediate);
input = gen_id(40)(input)?;
input = gen_short_uint(0)(input)?;
input = gen_short_string(method.exchange.as_str())(input)?;
input = gen_short_string(method.routing_key.as_str())(input)?;
input = gen_flags(&flags)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct Return {
pub reply_code: ShortUInt,
pub reply_text: ShortString,
pub exchange: ShortString,
pub routing_key: ShortString,
}
impl Return {
pub fn get_amqp_class_id(&self) -> Identifier {
60
}
pub fn get_amqp_method_id(&self) -> Identifier {
50
}
}
pub fn parse_return<I: ParsableInput>(i: I) -> ParserResult<I, Return> {
let (i, reply_code) = parse_short_uint(i)?;
let (i, reply_text) = parse_short_string(i)?;
let (i, exchange) = parse_short_string(i)?;
let (i, routing_key) = parse_short_string(i)?;
Ok((
i,
Return {
reply_code,
reply_text,
exchange,
routing_key,
},
))
}
pub fn gen_return<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a Return,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
input = gen_id(50)(input)?;
input = gen_short_uint(method.reply_code)(input)?;
input = gen_short_string(method.reply_text.as_str())(input)?;
input = gen_short_string(method.exchange.as_str())(input)?;
input = gen_short_string(method.routing_key.as_str())(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct Deliver {
pub consumer_tag: ShortString,
pub delivery_tag: LongLongUInt,
pub redelivered: Boolean,
pub exchange: ShortString,
pub routing_key: ShortString,
}
impl Deliver {
pub fn get_amqp_class_id(&self) -> Identifier {
60
}
pub fn get_amqp_method_id(&self) -> Identifier {
60
}
}
pub fn parse_deliver<I: ParsableInput>(i: I) -> ParserResult<I, Deliver> {
let (i, consumer_tag) = parse_short_string(i)?;
let (i, delivery_tag) = parse_long_long_uint(i)?;
let (i, flags) = parse_flags(i, &["redelivered"])?;
let (i, exchange) = parse_short_string(i)?;
let (i, routing_key) = parse_short_string(i)?;
Ok((
i,
Deliver {
consumer_tag,
delivery_tag,
redelivered: flags.get_flag("redelivered").unwrap_or(false),
exchange,
routing_key,
},
))
}
pub fn gen_deliver<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a Deliver,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
let mut flags = AMQPFlags::default();
flags.add_flag("redelivered".to_string(), method.redelivered);
input = gen_id(60)(input)?;
input = gen_short_string(method.consumer_tag.as_str())(input)?;
input = gen_long_long_uint(method.delivery_tag)(input)?;
input = gen_flags(&flags)(input)?;
input = gen_short_string(method.exchange.as_str())(input)?;
input = gen_short_string(method.routing_key.as_str())(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct Get {
pub queue: ShortString,
pub no_ack: Boolean,
}
impl Get {
pub fn get_amqp_class_id(&self) -> Identifier {
60
}
pub fn get_amqp_method_id(&self) -> Identifier {
70
}
}
pub fn parse_get<I: ParsableInput>(i: I) -> ParserResult<I, Get> {
let (i, _) = parse_short_uint(i)?;
let (i, queue) = parse_short_string(i)?;
let (i, flags) = parse_flags(i, &["no-ack"])?;
Ok((
i,
Get {
queue,
no_ack: flags.get_flag("no_ack").unwrap_or(false),
},
))
}
pub fn gen_get<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a Get,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
let mut flags = AMQPFlags::default();
flags.add_flag("no_ack".to_string(), method.no_ack);
input = gen_id(70)(input)?;
input = gen_short_uint(0)(input)?;
input = gen_short_string(method.queue.as_str())(input)?;
input = gen_flags(&flags)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct GetOk {
pub delivery_tag: LongLongUInt,
pub redelivered: Boolean,
pub exchange: ShortString,
pub routing_key: ShortString,
pub message_count: LongUInt,
}
impl GetOk {
pub fn get_amqp_class_id(&self) -> Identifier {
60
}
pub fn get_amqp_method_id(&self) -> Identifier {
71
}
}
pub fn parse_get_ok<I: ParsableInput>(i: I) -> ParserResult<I, GetOk> {
let (i, delivery_tag) = parse_long_long_uint(i)?;
let (i, flags) = parse_flags(i, &["redelivered"])?;
let (i, exchange) = parse_short_string(i)?;
let (i, routing_key) = parse_short_string(i)?;
let (i, message_count) = parse_long_uint(i)?;
Ok((
i,
GetOk {
delivery_tag,
redelivered: flags.get_flag("redelivered").unwrap_or(false),
exchange,
routing_key,
message_count,
},
))
}
pub fn gen_get_ok<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a GetOk,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
let mut flags = AMQPFlags::default();
flags.add_flag("redelivered".to_string(), method.redelivered);
input = gen_id(71)(input)?;
input = gen_long_long_uint(method.delivery_tag)(input)?;
input = gen_flags(&flags)(input)?;
input = gen_short_string(method.exchange.as_str())(input)?;
input = gen_short_string(method.routing_key.as_str())(input)?;
input = gen_long_uint(method.message_count)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct GetEmpty {}
impl GetEmpty {
pub fn get_amqp_class_id(&self) -> Identifier {
60
}
pub fn get_amqp_method_id(&self) -> Identifier {
72
}
}
pub fn parse_get_empty<I: ParsableInput>(i: I) -> ParserResult<I, GetEmpty> {
let (i, _) = parse_short_string(i)?;
Ok((i, GetEmpty {}))
}
pub fn gen_get_empty<'a, W: Write + BackToTheBuffer + 'a>(
_method: &'a GetEmpty,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
input = gen_id(72)(input)?;
input = gen_short_string("")(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct Ack {
pub delivery_tag: LongLongUInt,
pub multiple: Boolean,
}
impl Ack {
pub fn get_amqp_class_id(&self) -> Identifier {
60
}
pub fn get_amqp_method_id(&self) -> Identifier {
80
}
}
pub fn parse_ack<I: ParsableInput>(i: I) -> ParserResult<I, Ack> {
let (i, delivery_tag) = parse_long_long_uint(i)?;
let (i, flags) = parse_flags(i, &["multiple"])?;
Ok((
i,
Ack {
delivery_tag,
multiple: flags.get_flag("multiple").unwrap_or(false),
},
))
}
pub fn gen_ack<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a Ack,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
let mut flags = AMQPFlags::default();
flags.add_flag("multiple".to_string(), method.multiple);
input = gen_id(80)(input)?;
input = gen_long_long_uint(method.delivery_tag)(input)?;
input = gen_flags(&flags)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct Reject {
pub delivery_tag: LongLongUInt,
pub requeue: Boolean,
}
impl Reject {
pub fn get_amqp_class_id(&self) -> Identifier {
60
}
pub fn get_amqp_method_id(&self) -> Identifier {
90
}
}
pub fn parse_reject<I: ParsableInput>(i: I) -> ParserResult<I, Reject> {
let (i, delivery_tag) = parse_long_long_uint(i)?;
let (i, flags) = parse_flags(i, &["requeue"])?;
Ok((
i,
Reject {
delivery_tag,
requeue: flags.get_flag("requeue").unwrap_or(false),
},
))
}
pub fn gen_reject<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a Reject,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
let mut flags = AMQPFlags::default();
flags.add_flag("requeue".to_string(), method.requeue);
input = gen_id(90)(input)?;
input = gen_long_long_uint(method.delivery_tag)(input)?;
input = gen_flags(&flags)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct RecoverAsync {
pub requeue: Boolean,
}
impl RecoverAsync {
pub fn get_amqp_class_id(&self) -> Identifier {
60
}
pub fn get_amqp_method_id(&self) -> Identifier {
100
}
}
pub fn parse_recover_async<I: ParsableInput>(i: I) -> ParserResult<I, RecoverAsync> {
let (i, flags) = parse_flags(i, &["requeue"])?;
Ok((
i,
RecoverAsync {
requeue: flags.get_flag("requeue").unwrap_or(false),
},
))
}
pub fn gen_recover_async<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a RecoverAsync,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
let mut flags = AMQPFlags::default();
flags.add_flag("requeue".to_string(), method.requeue);
input = gen_id(100)(input)?;
input = gen_flags(&flags)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct Recover {
pub requeue: Boolean,
}
impl Recover {
pub fn get_amqp_class_id(&self) -> Identifier {
60
}
pub fn get_amqp_method_id(&self) -> Identifier {
110
}
}
pub fn parse_recover<I: ParsableInput>(i: I) -> ParserResult<I, Recover> {
let (i, flags) = parse_flags(i, &["requeue"])?;
Ok((
i,
Recover {
requeue: flags.get_flag("requeue").unwrap_or(false),
},
))
}
pub fn gen_recover<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a Recover,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
let mut flags = AMQPFlags::default();
flags.add_flag("requeue".to_string(), method.requeue);
input = gen_id(110)(input)?;
input = gen_flags(&flags)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct RecoverOk {}
impl RecoverOk {
pub fn get_amqp_class_id(&self) -> Identifier {
60
}
pub fn get_amqp_method_id(&self) -> Identifier {
111
}
}
pub fn parse_recover_ok<I: ParsableInput>(i: I) -> ParserResult<I, RecoverOk> {
Ok((i, RecoverOk {}))
}
pub fn gen_recover_ok<'a, W: Write + BackToTheBuffer + 'a>(
_: &'a RecoverOk,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
input = gen_id(111)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct Nack {
pub delivery_tag: LongLongUInt,
pub multiple: Boolean,
pub requeue: Boolean,
}
impl Nack {
pub fn get_amqp_class_id(&self) -> Identifier {
60
}
pub fn get_amqp_method_id(&self) -> Identifier {
120
}
}
pub fn parse_nack<I: ParsableInput>(i: I) -> ParserResult<I, Nack> {
let (i, delivery_tag) = parse_long_long_uint(i)?;
let (i, flags) = parse_flags(i, &["multiple", "requeue"])?;
Ok((
i,
Nack {
delivery_tag,
multiple: flags.get_flag("multiple").unwrap_or(false),
requeue: flags.get_flag("requeue").unwrap_or(false),
},
))
}
pub fn gen_nack<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a Nack,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
let mut flags = AMQPFlags::default();
flags.add_flag("multiple".to_string(), method.multiple);
flags.add_flag("requeue".to_string(), method.requeue);
input = gen_id(120)(input)?;
input = gen_long_long_uint(method.delivery_tag)(input)?;
input = gen_flags(&flags)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
pub struct AMQPProperties {
content_type: Option<ShortString>,
content_encoding: Option<ShortString>,
headers: Option<FieldTable>,
delivery_mode: Option<ShortShortUInt>,
priority: Option<ShortShortUInt>,
correlation_id: Option<ShortString>,
reply_to: Option<ShortString>,
expiration: Option<ShortString>,
message_id: Option<ShortString>,
timestamp: Option<Timestamp>,
kind: Option<ShortString>,
user_id: Option<ShortString>,
app_id: Option<ShortString>,
cluster_id: Option<ShortString>,
}
impl AMQPProperties {
pub fn with_content_type(mut self, value: ShortString) -> Self {
self.content_type = Some(value);
self
}
pub fn with_content_encoding(mut self, value: ShortString) -> Self {
self.content_encoding = Some(value);
self
}
pub fn with_headers(mut self, value: FieldTable) -> Self {
self.headers = Some(value);
self
}
pub fn with_delivery_mode(mut self, value: ShortShortUInt) -> Self {
self.delivery_mode = Some(value);
self
}
pub fn with_priority(mut self, value: ShortShortUInt) -> Self {
self.priority = Some(value);
self
}
pub fn with_correlation_id(mut self, value: ShortString) -> Self {
self.correlation_id = Some(value);
self
}
pub fn with_reply_to(mut self, value: ShortString) -> Self {
self.reply_to = Some(value);
self
}
pub fn with_expiration(mut self, value: ShortString) -> Self {
self.expiration = Some(value);
self
}
pub fn with_message_id(mut self, value: ShortString) -> Self {
self.message_id = Some(value);
self
}
pub fn with_timestamp(mut self, value: Timestamp) -> Self {
self.timestamp = Some(value);
self
}
pub fn with_type(mut self, value: ShortString) -> Self {
self.kind = Some(value);
self
}
pub fn with_user_id(mut self, value: ShortString) -> Self {
self.user_id = Some(value);
self
}
pub fn with_app_id(mut self, value: ShortString) -> Self {
self.app_id = Some(value);
self
}
pub fn with_cluster_id(mut self, value: ShortString) -> Self {
self.cluster_id = Some(value);
self
}
pub fn content_type(&self) -> &Option<ShortString> {
&self.content_type
}
pub fn content_encoding(&self) -> &Option<ShortString> {
&self.content_encoding
}
pub fn headers(&self) -> &Option<FieldTable> {
&self.headers
}
pub fn delivery_mode(&self) -> &Option<ShortShortUInt> {
&self.delivery_mode
}
pub fn priority(&self) -> &Option<ShortShortUInt> {
&self.priority
}
pub fn correlation_id(&self) -> &Option<ShortString> {
&self.correlation_id
}
pub fn reply_to(&self) -> &Option<ShortString> {
&self.reply_to
}
pub fn expiration(&self) -> &Option<ShortString> {
&self.expiration
}
pub fn message_id(&self) -> &Option<ShortString> {
&self.message_id
}
pub fn timestamp(&self) -> &Option<Timestamp> {
&self.timestamp
}
pub fn kind(&self) -> &Option<ShortString> {
&self.kind
}
pub fn user_id(&self) -> &Option<ShortString> {
&self.user_id
}
pub fn app_id(&self) -> &Option<ShortString> {
&self.app_id
}
pub fn cluster_id(&self) -> &Option<ShortString> {
&self.cluster_id
}
#[allow(clippy::identity_op)]
pub fn bitmask(&self) -> ShortUInt {
(if self.content_type.is_some() {
1 << (15 - 0)
} else {
0
}) + (if self.content_encoding.is_some() {
1 << (15 - 1)
} else {
0
}) + (if self.headers.is_some() {
1 << (15 - 2)
} else {
0
}) + (if self.delivery_mode.is_some() {
1 << (15 - 3)
} else {
0
}) + (if self.priority.is_some() {
1 << (15 - 4)
} else {
0
}) + (if self.correlation_id.is_some() {
1 << (15 - 5)
} else {
0
}) + (if self.reply_to.is_some() {
1 << (15 - 6)
} else {
0
}) + (if self.expiration.is_some() {
1 << (15 - 7)
} else {
0
}) + (if self.message_id.is_some() {
1 << (15 - 8)
} else {
0
}) + (if self.timestamp.is_some() {
1 << (15 - 9)
} else {
0
}) + (if self.kind.is_some() {
1 << (15 - 10)
} else {
0
}) + (if self.user_id.is_some() {
1 << (15 - 11)
} else {
0
}) + (if self.app_id.is_some() {
1 << (15 - 12)
} else {
0
}) + (if self.cluster_id.is_some() {
1 << (15 - 13)
} else {
0
})
}
}
#[allow(clippy::identity_op)]
pub fn parse_properties<I: ParsableInput>(i: I) -> ParserResult<I, AMQPProperties> {
let (i, flags) = parse_short_uint(i)?;
let (i, content_type) = if flags & (1 << (15 - 0)) != 0 {
map(parse_short_string, Some)(i)?
} else {
(i, None)
};
let (i, content_encoding) = if flags & (1 << (15 - 1)) != 0 {
map(parse_short_string, Some)(i)?
} else {
(i, None)
};
let (i, headers) = if flags & (1 << (15 - 2)) != 0 {
map(parse_field_table, Some)(i)?
} else {
(i, None)
};
let (i, delivery_mode) = if flags & (1 << (15 - 3)) != 0 {
map(parse_short_short_uint, Some)(i)?
} else {
(i, None)
};
let (i, priority) = if flags & (1 << (15 - 4)) != 0 {
map(parse_short_short_uint, Some)(i)?
} else {
(i, None)
};
let (i, correlation_id) = if flags & (1 << (15 - 5)) != 0 {
map(parse_short_string, Some)(i)?
} else {
(i, None)
};
let (i, reply_to) = if flags & (1 << (15 - 6)) != 0 {
map(parse_short_string, Some)(i)?
} else {
(i, None)
};
let (i, expiration) = if flags & (1 << (15 - 7)) != 0 {
map(parse_short_string, Some)(i)?
} else {
(i, None)
};
let (i, message_id) = if flags & (1 << (15 - 8)) != 0 {
map(parse_short_string, Some)(i)?
} else {
(i, None)
};
let (i, timestamp) = if flags & (1 << (15 - 9)) != 0 {
map(parse_timestamp, Some)(i)?
} else {
(i, None)
};
let (i, kind) = if flags & (1 << (15 - 10)) != 0 {
map(parse_short_string, Some)(i)?
} else {
(i, None)
};
let (i, user_id) = if flags & (1 << (15 - 11)) != 0 {
map(parse_short_string, Some)(i)?
} else {
(i, None)
};
let (i, app_id) = if flags & (1 << (15 - 12)) != 0 {
map(parse_short_string, Some)(i)?
} else {
(i, None)
};
let (i, cluster_id) = if flags & (1 << (15 - 13)) != 0 {
map(parse_short_string, Some)(i)?
} else {
(i, None)
};
Ok((
i,
AMQPProperties {
content_type,
content_encoding,
headers,
delivery_mode,
priority,
correlation_id,
reply_to,
expiration,
message_id,
timestamp,
kind,
user_id,
app_id,
cluster_id,
},
))
}
pub fn gen_properties<'a, W: Write + BackToTheBuffer + 'a>(
props: &'a AMQPProperties,
) -> impl SerializeFn<W> + 'a {
cookie_factory::sequence::pair(gen_short_uint(props.bitmask()), move |mut input| {
if let Some(prop) = props.content_type.as_ref() {
input = gen_short_string(prop.as_str())(input)?;
}
if let Some(prop) = props.content_encoding.as_ref() {
input = gen_short_string(prop.as_str())(input)?;
}
if let Some(prop) = props.headers.as_ref() {
input = gen_field_table(prop)(input)?;
}
if let Some(prop) = props.delivery_mode {
input = gen_short_short_uint(prop)(input)?;
}
if let Some(prop) = props.priority {
input = gen_short_short_uint(prop)(input)?;
}
if let Some(prop) = props.correlation_id.as_ref() {
input = gen_short_string(prop.as_str())(input)?;
}
if let Some(prop) = props.reply_to.as_ref() {
input = gen_short_string(prop.as_str())(input)?;
}
if let Some(prop) = props.expiration.as_ref() {
input = gen_short_string(prop.as_str())(input)?;
}
if let Some(prop) = props.message_id.as_ref() {
input = gen_short_string(prop.as_str())(input)?;
}
if let Some(prop) = props.timestamp {
input = gen_timestamp(prop)(input)?;
}
if let Some(prop) = props.kind.as_ref() {
input = gen_short_string(prop.as_str())(input)?;
}
if let Some(prop) = props.user_id.as_ref() {
input = gen_short_string(prop.as_str())(input)?;
}
if let Some(prop) = props.app_id.as_ref() {
input = gen_short_string(prop.as_str())(input)?;
}
if let Some(prop) = props.cluster_id.as_ref() {
input = gen_short_string(prop.as_str())(input)?;
}
Ok(input)
})
}
}
pub mod connection {
use super::*;
pub fn parse_connection<I: ParsableInput>(i: I) -> ParserResult<I, connection::AMQPMethod> {
context(
"parse_connection",
map_opt(
flat_map(parse_id, |id| {
move |i| match id {
10 => context(
"parse_start",
map(map(parse_start, AMQPMethod::Start), Some),
)(i),
11 => context(
"parse_start_ok",
map(map(parse_start_ok, AMQPMethod::StartOk), Some),
)(i),
20 => context(
"parse_secure",
map(map(parse_secure, AMQPMethod::Secure), Some),
)(i),
21 => context(
"parse_secure_ok",
map(map(parse_secure_ok, AMQPMethod::SecureOk), Some),
)(i),
30 => {
context("parse_tune", map(map(parse_tune, AMQPMethod::Tune), Some))(i)
}
31 => context(
"parse_tune_ok",
map(map(parse_tune_ok, AMQPMethod::TuneOk), Some),
)(i),
40 => {
context("parse_open", map(map(parse_open, AMQPMethod::Open), Some))(i)
}
41 => context(
"parse_open_ok",
map(map(parse_open_ok, AMQPMethod::OpenOk), Some),
)(i),
50 => context(
"parse_close",
map(map(parse_close, AMQPMethod::Close), Some),
)(i),
51 => context(
"parse_close_ok",
map(map(parse_close_ok, AMQPMethod::CloseOk), Some),
)(i),
60 => context(
"parse_blocked",
map(map(parse_blocked, AMQPMethod::Blocked), Some),
)(i),
61 => context(
"parse_unblocked",
map(map(parse_unblocked, AMQPMethod::Unblocked), Some),
)(i),
70 => context(
"parse_update_secret",
map(map(parse_update_secret, AMQPMethod::UpdateSecret), Some),
)(i),
71 => context(
"parse_update_secret_ok",
map(
map(parse_update_secret_ok, AMQPMethod::UpdateSecretOk),
Some,
),
)(i),
_ => Ok((i, None)),
}
}),
std::convert::identity,
),
)(i)
}
pub fn gen_connection<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a AMQPMethod,
) -> impl SerializeFn<W> + 'a {
cookie_factory::sequence::pair(gen_id(10), move |input| match *method {
AMQPMethod::Start(ref start) => gen_start(start)(input),
AMQPMethod::StartOk(ref start_ok) => gen_start_ok(start_ok)(input),
AMQPMethod::Secure(ref secure) => gen_secure(secure)(input),
AMQPMethod::SecureOk(ref secure_ok) => gen_secure_ok(secure_ok)(input),
AMQPMethod::Tune(ref tune) => gen_tune(tune)(input),
AMQPMethod::TuneOk(ref tune_ok) => gen_tune_ok(tune_ok)(input),
AMQPMethod::Open(ref open) => gen_open(open)(input),
AMQPMethod::OpenOk(ref open_ok) => gen_open_ok(open_ok)(input),
AMQPMethod::Close(ref close) => gen_close(close)(input),
AMQPMethod::CloseOk(ref close_ok) => gen_close_ok(close_ok)(input),
AMQPMethod::Blocked(ref blocked) => gen_blocked(blocked)(input),
AMQPMethod::Unblocked(ref unblocked) => gen_unblocked(unblocked)(input),
AMQPMethod::UpdateSecret(ref update_secret) => gen_update_secret(update_secret)(input),
AMQPMethod::UpdateSecretOk(ref update_secret_ok) => {
gen_update_secret_ok(update_secret_ok)(input)
}
})
}
#[derive(Clone, Debug, PartialEq)]
pub enum AMQPMethod {
Start(Start),
StartOk(StartOk),
Secure(Secure),
SecureOk(SecureOk),
Tune(Tune),
TuneOk(TuneOk),
Open(Open),
OpenOk(OpenOk),
Close(Close),
CloseOk(CloseOk),
Blocked(Blocked),
Unblocked(Unblocked),
UpdateSecret(UpdateSecret),
UpdateSecretOk(UpdateSecretOk),
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct Start {
pub version_major: ShortShortUInt,
pub version_minor: ShortShortUInt,
pub server_properties: FieldTable,
pub mechanisms: LongString,
pub locales: LongString,
}
impl Start {
pub fn get_amqp_class_id(&self) -> Identifier {
10
}
pub fn get_amqp_method_id(&self) -> Identifier {
10
}
}
pub fn parse_start<I: ParsableInput>(i: I) -> ParserResult<I, Start> {
let (i, version_major) = parse_short_short_uint(i)?;
let (i, version_minor) = parse_short_short_uint(i)?;
let (i, server_properties) = parse_field_table(i)?;
let (i, mechanisms) = parse_long_string(i)?;
let (i, locales) = parse_long_string(i)?;
Ok((
i,
Start {
version_major,
version_minor,
server_properties,
mechanisms,
locales,
},
))
}
pub fn gen_start<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a Start,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
input = gen_id(10)(input)?;
input = gen_short_short_uint(method.version_major)(input)?;
input = gen_short_short_uint(method.version_minor)(input)?;
input = gen_field_table(&method.server_properties)(input)?;
input = gen_long_string(method.mechanisms.as_bytes())(input)?;
input = gen_long_string(method.locales.as_bytes())(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct StartOk {
pub client_properties: FieldTable,
pub mechanism: ShortString,
pub response: LongString,
pub locale: ShortString,
}
impl StartOk {
pub fn get_amqp_class_id(&self) -> Identifier {
10
}
pub fn get_amqp_method_id(&self) -> Identifier {
11
}
}
pub fn parse_start_ok<I: ParsableInput>(i: I) -> ParserResult<I, StartOk> {
let (i, client_properties) = parse_field_table(i)?;
let (i, mechanism) = parse_short_string(i)?;
let (i, response) = parse_long_string(i)?;
let (i, locale) = parse_short_string(i)?;
Ok((
i,
StartOk {
client_properties,
mechanism,
response,
locale,
},
))
}
pub fn gen_start_ok<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a StartOk,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
input = gen_id(11)(input)?;
input = gen_field_table(&method.client_properties)(input)?;
input = gen_short_string(method.mechanism.as_str())(input)?;
input = gen_long_string(method.response.as_bytes())(input)?;
input = gen_short_string(method.locale.as_str())(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct Secure {
pub challenge: LongString,
}
impl Secure {
pub fn get_amqp_class_id(&self) -> Identifier {
10
}
pub fn get_amqp_method_id(&self) -> Identifier {
20
}
}
pub fn parse_secure<I: ParsableInput>(i: I) -> ParserResult<I, Secure> {
let (i, challenge) = parse_long_string(i)?;
Ok((i, Secure { challenge }))
}
pub fn gen_secure<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a Secure,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
input = gen_id(20)(input)?;
input = gen_long_string(method.challenge.as_bytes())(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct SecureOk {
pub response: LongString,
}
impl SecureOk {
pub fn get_amqp_class_id(&self) -> Identifier {
10
}
pub fn get_amqp_method_id(&self) -> Identifier {
21
}
}
pub fn parse_secure_ok<I: ParsableInput>(i: I) -> ParserResult<I, SecureOk> {
let (i, response) = parse_long_string(i)?;
Ok((i, SecureOk { response }))
}
pub fn gen_secure_ok<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a SecureOk,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
input = gen_id(21)(input)?;
input = gen_long_string(method.response.as_bytes())(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct Tune {
pub channel_max: ShortUInt,
pub frame_max: LongUInt,
pub heartbeat: ShortUInt,
}
impl Tune {
pub fn get_amqp_class_id(&self) -> Identifier {
10
}
pub fn get_amqp_method_id(&self) -> Identifier {
30
}
}
pub fn parse_tune<I: ParsableInput>(i: I) -> ParserResult<I, Tune> {
let (i, channel_max) = parse_short_uint(i)?;
let (i, frame_max) = parse_long_uint(i)?;
let (i, heartbeat) = parse_short_uint(i)?;
Ok((
i,
Tune {
channel_max,
frame_max,
heartbeat,
},
))
}
pub fn gen_tune<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a Tune,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
input = gen_id(30)(input)?;
input = gen_short_uint(method.channel_max)(input)?;
input = gen_long_uint(method.frame_max)(input)?;
input = gen_short_uint(method.heartbeat)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct TuneOk {
pub channel_max: ShortUInt,
pub frame_max: LongUInt,
pub heartbeat: ShortUInt,
}
impl TuneOk {
pub fn get_amqp_class_id(&self) -> Identifier {
10
}
pub fn get_amqp_method_id(&self) -> Identifier {
31
}
}
pub fn parse_tune_ok<I: ParsableInput>(i: I) -> ParserResult<I, TuneOk> {
let (i, channel_max) = parse_short_uint(i)?;
let (i, frame_max) = parse_long_uint(i)?;
let (i, heartbeat) = parse_short_uint(i)?;
Ok((
i,
TuneOk {
channel_max,
frame_max,
heartbeat,
},
))
}
pub fn gen_tune_ok<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a TuneOk,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
input = gen_id(31)(input)?;
input = gen_short_uint(method.channel_max)(input)?;
input = gen_long_uint(method.frame_max)(input)?;
input = gen_short_uint(method.heartbeat)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct Open {
pub virtual_host: ShortString,
}
impl Open {
pub fn get_amqp_class_id(&self) -> Identifier {
10
}
pub fn get_amqp_method_id(&self) -> Identifier {
40
}
}
pub fn parse_open<I: ParsableInput>(i: I) -> ParserResult<I, Open> {
let (i, virtual_host) = parse_short_string(i)?;
let (i, _) = parse_short_string(i)?;
let (i, _) = parse_flags(i, &["insist"])?;
Ok((i, Open { virtual_host }))
}
pub fn gen_open<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a Open,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
let mut flags = AMQPFlags::default();
flags.add_flag("insist".to_string(), false);
input = gen_id(40)(input)?;
input = gen_short_string(method.virtual_host.as_str())(input)?;
input = gen_short_string("")(input)?;
input = gen_flags(&flags)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct OpenOk {}
impl OpenOk {
pub fn get_amqp_class_id(&self) -> Identifier {
10
}
pub fn get_amqp_method_id(&self) -> Identifier {
41
}
}
pub fn parse_open_ok<I: ParsableInput>(i: I) -> ParserResult<I, OpenOk> {
let (i, _) = parse_short_string(i)?;
Ok((i, OpenOk {}))
}
pub fn gen_open_ok<'a, W: Write + BackToTheBuffer + 'a>(
_method: &'a OpenOk,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
input = gen_id(41)(input)?;
input = gen_short_string("")(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct Close {
pub reply_code: ShortUInt,
pub reply_text: ShortString,
pub class_id: ShortUInt,
pub method_id: ShortUInt,
}
impl Close {
pub fn get_amqp_class_id(&self) -> Identifier {
10
}
pub fn get_amqp_method_id(&self) -> Identifier {
50
}
}
pub fn parse_close<I: ParsableInput>(i: I) -> ParserResult<I, Close> {
let (i, reply_code) = parse_short_uint(i)?;
let (i, reply_text) = parse_short_string(i)?;
let (i, class_id) = parse_short_uint(i)?;
let (i, method_id) = parse_short_uint(i)?;
Ok((
i,
Close {
reply_code,
reply_text,
class_id,
method_id,
},
))
}
pub fn gen_close<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a Close,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
input = gen_id(50)(input)?;
input = gen_short_uint(method.reply_code)(input)?;
input = gen_short_string(method.reply_text.as_str())(input)?;
input = gen_short_uint(method.class_id)(input)?;
input = gen_short_uint(method.method_id)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct CloseOk {}
impl CloseOk {
pub fn get_amqp_class_id(&self) -> Identifier {
10
}
pub fn get_amqp_method_id(&self) -> Identifier {
51
}
}
pub fn parse_close_ok<I: ParsableInput>(i: I) -> ParserResult<I, CloseOk> {
Ok((i, CloseOk {}))
}
pub fn gen_close_ok<'a, W: Write + BackToTheBuffer + 'a>(
_: &'a CloseOk,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
input = gen_id(51)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct Blocked {
pub reason: ShortString,
}
impl Blocked {
pub fn get_amqp_class_id(&self) -> Identifier {
10
}
pub fn get_amqp_method_id(&self) -> Identifier {
60
}
}
pub fn parse_blocked<I: ParsableInput>(i: I) -> ParserResult<I, Blocked> {
let (i, reason) = parse_short_string(i)?;
Ok((i, Blocked { reason }))
}
pub fn gen_blocked<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a Blocked,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
input = gen_id(60)(input)?;
input = gen_short_string(method.reason.as_str())(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct Unblocked {}
impl Unblocked {
pub fn get_amqp_class_id(&self) -> Identifier {
10
}
pub fn get_amqp_method_id(&self) -> Identifier {
61
}
}
pub fn parse_unblocked<I: ParsableInput>(i: I) -> ParserResult<I, Unblocked> {
Ok((i, Unblocked {}))
}
pub fn gen_unblocked<'a, W: Write + BackToTheBuffer + 'a>(
_: &'a Unblocked,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
input = gen_id(61)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct UpdateSecret {
pub new_secret: LongString,
pub reason: ShortString,
}
impl UpdateSecret {
pub fn get_amqp_class_id(&self) -> Identifier {
10
}
pub fn get_amqp_method_id(&self) -> Identifier {
70
}
}
pub fn parse_update_secret<I: ParsableInput>(i: I) -> ParserResult<I, UpdateSecret> {
let (i, new_secret) = parse_long_string(i)?;
let (i, reason) = parse_short_string(i)?;
Ok((i, UpdateSecret { new_secret, reason }))
}
pub fn gen_update_secret<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a UpdateSecret,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
input = gen_id(70)(input)?;
input = gen_long_string(method.new_secret.as_bytes())(input)?;
input = gen_short_string(method.reason.as_str())(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct UpdateSecretOk {}
impl UpdateSecretOk {
pub fn get_amqp_class_id(&self) -> Identifier {
10
}
pub fn get_amqp_method_id(&self) -> Identifier {
71
}
}
pub fn parse_update_secret_ok<I: ParsableInput>(i: I) -> ParserResult<I, UpdateSecretOk> {
Ok((i, UpdateSecretOk {}))
}
pub fn gen_update_secret_ok<'a, W: Write + BackToTheBuffer + 'a>(
_: &'a UpdateSecretOk,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
input = gen_id(71)(input)?;
Ok(input)
}
}
}
pub mod channel {
use super::*;
pub fn parse_channel<I: ParsableInput>(i: I) -> ParserResult<I, channel::AMQPMethod> {
context(
"parse_channel",
map_opt(
flat_map(parse_id, |id| {
move |i| match id {
10 => {
context("parse_open", map(map(parse_open, AMQPMethod::Open), Some))(i)
}
11 => context(
"parse_open_ok",
map(map(parse_open_ok, AMQPMethod::OpenOk), Some),
)(i),
20 => {
context("parse_flow", map(map(parse_flow, AMQPMethod::Flow), Some))(i)
}
21 => context(
"parse_flow_ok",
map(map(parse_flow_ok, AMQPMethod::FlowOk), Some),
)(i),
40 => context(
"parse_close",
map(map(parse_close, AMQPMethod::Close), Some),
)(i),
41 => context(
"parse_close_ok",
map(map(parse_close_ok, AMQPMethod::CloseOk), Some),
)(i),
_ => Ok((i, None)),
}
}),
std::convert::identity,
),
)(i)
}
pub fn gen_channel<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a AMQPMethod,
) -> impl SerializeFn<W> + 'a {
cookie_factory::sequence::pair(gen_id(20), move |input| match *method {
AMQPMethod::Open(ref open) => gen_open(open)(input),
AMQPMethod::OpenOk(ref open_ok) => gen_open_ok(open_ok)(input),
AMQPMethod::Flow(ref flow) => gen_flow(flow)(input),
AMQPMethod::FlowOk(ref flow_ok) => gen_flow_ok(flow_ok)(input),
AMQPMethod::Close(ref close) => gen_close(close)(input),
AMQPMethod::CloseOk(ref close_ok) => gen_close_ok(close_ok)(input),
})
}
#[derive(Clone, Debug, PartialEq)]
pub enum AMQPMethod {
Open(Open),
OpenOk(OpenOk),
Flow(Flow),
FlowOk(FlowOk),
Close(Close),
CloseOk(CloseOk),
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct Open {}
impl Open {
pub fn get_amqp_class_id(&self) -> Identifier {
20
}
pub fn get_amqp_method_id(&self) -> Identifier {
10
}
}
pub fn parse_open<I: ParsableInput>(i: I) -> ParserResult<I, Open> {
let (i, _) = parse_short_string(i)?;
Ok((i, Open {}))
}
pub fn gen_open<'a, W: Write + BackToTheBuffer + 'a>(
_method: &'a Open,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
input = gen_id(10)(input)?;
input = gen_short_string("")(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct OpenOk {}
impl OpenOk {
pub fn get_amqp_class_id(&self) -> Identifier {
20
}
pub fn get_amqp_method_id(&self) -> Identifier {
11
}
}
pub fn parse_open_ok<I: ParsableInput>(i: I) -> ParserResult<I, OpenOk> {
let (i, _) = parse_long_string(i)?;
Ok((i, OpenOk {}))
}
pub fn gen_open_ok<'a, W: Write + BackToTheBuffer + 'a>(
_method: &'a OpenOk,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
input = gen_id(11)(input)?;
input = gen_long_string(b"")(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct Flow {
pub active: Boolean,
}
impl Flow {
pub fn get_amqp_class_id(&self) -> Identifier {
20
}
pub fn get_amqp_method_id(&self) -> Identifier {
20
}
}
pub fn parse_flow<I: ParsableInput>(i: I) -> ParserResult<I, Flow> {
let (i, flags) = parse_flags(i, &["active"])?;
Ok((
i,
Flow {
active: flags.get_flag("active").unwrap_or(false),
},
))
}
pub fn gen_flow<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a Flow,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
let mut flags = AMQPFlags::default();
flags.add_flag("active".to_string(), method.active);
input = gen_id(20)(input)?;
input = gen_flags(&flags)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct FlowOk {
pub active: Boolean,
}
impl FlowOk {
pub fn get_amqp_class_id(&self) -> Identifier {
20
}
pub fn get_amqp_method_id(&self) -> Identifier {
21
}
}
pub fn parse_flow_ok<I: ParsableInput>(i: I) -> ParserResult<I, FlowOk> {
let (i, flags) = parse_flags(i, &["active"])?;
Ok((
i,
FlowOk {
active: flags.get_flag("active").unwrap_or(false),
},
))
}
pub fn gen_flow_ok<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a FlowOk,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
let mut flags = AMQPFlags::default();
flags.add_flag("active".to_string(), method.active);
input = gen_id(21)(input)?;
input = gen_flags(&flags)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct Close {
pub reply_code: ShortUInt,
pub reply_text: ShortString,
pub class_id: ShortUInt,
pub method_id: ShortUInt,
}
impl Close {
pub fn get_amqp_class_id(&self) -> Identifier {
20
}
pub fn get_amqp_method_id(&self) -> Identifier {
40
}
}
pub fn parse_close<I: ParsableInput>(i: I) -> ParserResult<I, Close> {
let (i, reply_code) = parse_short_uint(i)?;
let (i, reply_text) = parse_short_string(i)?;
let (i, class_id) = parse_short_uint(i)?;
let (i, method_id) = parse_short_uint(i)?;
Ok((
i,
Close {
reply_code,
reply_text,
class_id,
method_id,
},
))
}
pub fn gen_close<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a Close,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
input = gen_id(40)(input)?;
input = gen_short_uint(method.reply_code)(input)?;
input = gen_short_string(method.reply_text.as_str())(input)?;
input = gen_short_uint(method.class_id)(input)?;
input = gen_short_uint(method.method_id)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct CloseOk {}
impl CloseOk {
pub fn get_amqp_class_id(&self) -> Identifier {
20
}
pub fn get_amqp_method_id(&self) -> Identifier {
41
}
}
pub fn parse_close_ok<I: ParsableInput>(i: I) -> ParserResult<I, CloseOk> {
Ok((i, CloseOk {}))
}
pub fn gen_close_ok<'a, W: Write + BackToTheBuffer + 'a>(
_: &'a CloseOk,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
input = gen_id(41)(input)?;
Ok(input)
}
}
}
pub mod access {
use super::*;
pub fn parse_access<I: ParsableInput>(i: I) -> ParserResult<I, access::AMQPMethod> {
context(
"parse_access",
map_opt(
flat_map(parse_id, |id| {
move |i| match id {
10 => context(
"parse_request",
map(map(parse_request, AMQPMethod::Request), Some),
)(i),
11 => context(
"parse_request_ok",
map(map(parse_request_ok, AMQPMethod::RequestOk), Some),
)(i),
_ => Ok((i, None)),
}
}),
std::convert::identity,
),
)(i)
}
pub fn gen_access<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a AMQPMethod,
) -> impl SerializeFn<W> + 'a {
cookie_factory::sequence::pair(gen_id(30), move |input| match *method {
AMQPMethod::Request(ref request) => gen_request(request)(input),
AMQPMethod::RequestOk(ref request_ok) => gen_request_ok(request_ok)(input),
})
}
#[derive(Clone, Debug, PartialEq)]
pub enum AMQPMethod {
Request(Request),
RequestOk(RequestOk),
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct Request {
pub realm: ShortString,
pub exclusive: Boolean,
pub passive: Boolean,
pub active: Boolean,
pub write: Boolean,
pub read: Boolean,
}
impl Request {
pub fn get_amqp_class_id(&self) -> Identifier {
30
}
pub fn get_amqp_method_id(&self) -> Identifier {
10
}
}
pub fn parse_request<I: ParsableInput>(i: I) -> ParserResult<I, Request> {
let (i, realm) = parse_short_string(i)?;
let (i, flags) = parse_flags(i, &["exclusive", "passive", "active", "write", "read"])?;
Ok((
i,
Request {
realm,
exclusive: flags.get_flag("exclusive").unwrap_or(false),
passive: flags.get_flag("passive").unwrap_or(false),
active: flags.get_flag("active").unwrap_or(false),
write: flags.get_flag("write").unwrap_or(false),
read: flags.get_flag("read").unwrap_or(false),
},
))
}
pub fn gen_request<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a Request,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
let mut flags = AMQPFlags::default();
flags.add_flag("exclusive".to_string(), method.exclusive);
flags.add_flag("passive".to_string(), method.passive);
flags.add_flag("active".to_string(), method.active);
flags.add_flag("write".to_string(), method.write);
flags.add_flag("read".to_string(), method.read);
input = gen_id(10)(input)?;
input = gen_short_string(method.realm.as_str())(input)?;
input = gen_flags(&flags)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct RequestOk {}
impl RequestOk {
pub fn get_amqp_class_id(&self) -> Identifier {
30
}
pub fn get_amqp_method_id(&self) -> Identifier {
11
}
}
pub fn parse_request_ok<I: ParsableInput>(i: I) -> ParserResult<I, RequestOk> {
let (i, _) = parse_short_uint(i)?;
Ok((i, RequestOk {}))
}
pub fn gen_request_ok<'a, W: Write + BackToTheBuffer + 'a>(
_method: &'a RequestOk,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
input = gen_id(11)(input)?;
input = gen_short_uint(1)(input)?;
Ok(input)
}
}
}
pub mod exchange {
use super::*;
pub fn parse_exchange<I: ParsableInput>(i: I) -> ParserResult<I, exchange::AMQPMethod> {
context(
"parse_exchange",
map_opt(
flat_map(parse_id, |id| {
move |i| match id {
10 => context(
"parse_declare",
map(map(parse_declare, AMQPMethod::Declare), Some),
)(i),
11 => context(
"parse_declare_ok",
map(map(parse_declare_ok, AMQPMethod::DeclareOk), Some),
)(i),
20 => context(
"parse_delete",
map(map(parse_delete, AMQPMethod::Delete), Some),
)(i),
21 => context(
"parse_delete_ok",
map(map(parse_delete_ok, AMQPMethod::DeleteOk), Some),
)(i),
30 => {
context("parse_bind", map(map(parse_bind, AMQPMethod::Bind), Some))(i)
}
31 => context(
"parse_bind_ok",
map(map(parse_bind_ok, AMQPMethod::BindOk), Some),
)(i),
40 => context(
"parse_unbind",
map(map(parse_unbind, AMQPMethod::Unbind), Some),
)(i),
51 => context(
"parse_unbind_ok",
map(map(parse_unbind_ok, AMQPMethod::UnbindOk), Some),
)(i),
_ => Ok((i, None)),
}
}),
std::convert::identity,
),
)(i)
}
pub fn gen_exchange<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a AMQPMethod,
) -> impl SerializeFn<W> + 'a {
cookie_factory::sequence::pair(gen_id(40), move |input| match *method {
AMQPMethod::Declare(ref declare) => gen_declare(declare)(input),
AMQPMethod::DeclareOk(ref declare_ok) => gen_declare_ok(declare_ok)(input),
AMQPMethod::Delete(ref delete) => gen_delete(delete)(input),
AMQPMethod::DeleteOk(ref delete_ok) => gen_delete_ok(delete_ok)(input),
AMQPMethod::Bind(ref bind) => gen_bind(bind)(input),
AMQPMethod::BindOk(ref bind_ok) => gen_bind_ok(bind_ok)(input),
AMQPMethod::Unbind(ref unbind) => gen_unbind(unbind)(input),
AMQPMethod::UnbindOk(ref unbind_ok) => gen_unbind_ok(unbind_ok)(input),
})
}
#[derive(Clone, Debug, PartialEq)]
pub enum AMQPMethod {
Declare(Declare),
DeclareOk(DeclareOk),
Delete(Delete),
DeleteOk(DeleteOk),
Bind(Bind),
BindOk(BindOk),
Unbind(Unbind),
UnbindOk(UnbindOk),
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct Declare {
pub exchange: ShortString,
pub kind: ShortString,
pub passive: Boolean,
pub durable: Boolean,
pub auto_delete: Boolean,
pub internal: Boolean,
pub nowait: Boolean,
pub arguments: FieldTable,
}
impl Declare {
pub fn get_amqp_class_id(&self) -> Identifier {
40
}
pub fn get_amqp_method_id(&self) -> Identifier {
10
}
}
pub fn parse_declare<I: ParsableInput>(i: I) -> ParserResult<I, Declare> {
let (i, _) = parse_short_uint(i)?;
let (i, exchange) = parse_short_string(i)?;
let (i, kind) = parse_short_string(i)?;
let (i, flags) = parse_flags(
i,
&["passive", "durable", "auto-delete", "internal", "nowait"],
)?;
let (i, arguments) = parse_field_table(i)?;
Ok((
i,
Declare {
exchange,
kind,
passive: flags.get_flag("passive").unwrap_or(false),
durable: flags.get_flag("durable").unwrap_or(false),
auto_delete: flags.get_flag("auto_delete").unwrap_or(false),
internal: flags.get_flag("internal").unwrap_or(false),
nowait: flags.get_flag("nowait").unwrap_or(false),
arguments,
},
))
}
pub fn gen_declare<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a Declare,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
let mut flags = AMQPFlags::default();
flags.add_flag("passive".to_string(), method.passive);
flags.add_flag("durable".to_string(), method.durable);
flags.add_flag("auto_delete".to_string(), method.auto_delete);
flags.add_flag("internal".to_string(), method.internal);
flags.add_flag("nowait".to_string(), method.nowait);
input = gen_id(10)(input)?;
input = gen_short_uint(0)(input)?;
input = gen_short_string(method.exchange.as_str())(input)?;
input = gen_short_string(method.kind.as_str())(input)?;
input = gen_flags(&flags)(input)?;
input = gen_field_table(&method.arguments)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct DeclareOk {}
impl DeclareOk {
pub fn get_amqp_class_id(&self) -> Identifier {
40
}
pub fn get_amqp_method_id(&self) -> Identifier {
11
}
}
pub fn parse_declare_ok<I: ParsableInput>(i: I) -> ParserResult<I, DeclareOk> {
Ok((i, DeclareOk {}))
}
pub fn gen_declare_ok<'a, W: Write + BackToTheBuffer + 'a>(
_: &'a DeclareOk,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
input = gen_id(11)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct Delete {
pub exchange: ShortString,
pub if_unused: Boolean,
pub nowait: Boolean,
}
impl Delete {
pub fn get_amqp_class_id(&self) -> Identifier {
40
}
pub fn get_amqp_method_id(&self) -> Identifier {
20
}
}
pub fn parse_delete<I: ParsableInput>(i: I) -> ParserResult<I, Delete> {
let (i, _) = parse_short_uint(i)?;
let (i, exchange) = parse_short_string(i)?;
let (i, flags) = parse_flags(i, &["if-unused", "nowait"])?;
Ok((
i,
Delete {
exchange,
if_unused: flags.get_flag("if_unused").unwrap_or(false),
nowait: flags.get_flag("nowait").unwrap_or(false),
},
))
}
pub fn gen_delete<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a Delete,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
let mut flags = AMQPFlags::default();
flags.add_flag("if_unused".to_string(), method.if_unused);
flags.add_flag("nowait".to_string(), method.nowait);
input = gen_id(20)(input)?;
input = gen_short_uint(0)(input)?;
input = gen_short_string(method.exchange.as_str())(input)?;
input = gen_flags(&flags)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct DeleteOk {}
impl DeleteOk {
pub fn get_amqp_class_id(&self) -> Identifier {
40
}
pub fn get_amqp_method_id(&self) -> Identifier {
21
}
}
pub fn parse_delete_ok<I: ParsableInput>(i: I) -> ParserResult<I, DeleteOk> {
Ok((i, DeleteOk {}))
}
pub fn gen_delete_ok<'a, W: Write + BackToTheBuffer + 'a>(
_: &'a DeleteOk,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
input = gen_id(21)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct Bind {
pub destination: ShortString,
pub source: ShortString,
pub routing_key: ShortString,
pub nowait: Boolean,
pub arguments: FieldTable,
}
impl Bind {
pub fn get_amqp_class_id(&self) -> Identifier {
40
}
pub fn get_amqp_method_id(&self) -> Identifier {
30
}
}
pub fn parse_bind<I: ParsableInput>(i: I) -> ParserResult<I, Bind> {
let (i, _) = parse_short_uint(i)?;
let (i, destination) = parse_short_string(i)?;
let (i, source) = parse_short_string(i)?;
let (i, routing_key) = parse_short_string(i)?;
let (i, flags) = parse_flags(i, &["nowait"])?;
let (i, arguments) = parse_field_table(i)?;
Ok((
i,
Bind {
destination,
source,
routing_key,
nowait: flags.get_flag("nowait").unwrap_or(false),
arguments,
},
))
}
pub fn gen_bind<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a Bind,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
let mut flags = AMQPFlags::default();
flags.add_flag("nowait".to_string(), method.nowait);
input = gen_id(30)(input)?;
input = gen_short_uint(0)(input)?;
input = gen_short_string(method.destination.as_str())(input)?;
input = gen_short_string(method.source.as_str())(input)?;
input = gen_short_string(method.routing_key.as_str())(input)?;
input = gen_flags(&flags)(input)?;
input = gen_field_table(&method.arguments)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct BindOk {}
impl BindOk {
pub fn get_amqp_class_id(&self) -> Identifier {
40
}
pub fn get_amqp_method_id(&self) -> Identifier {
31
}
}
pub fn parse_bind_ok<I: ParsableInput>(i: I) -> ParserResult<I, BindOk> {
Ok((i, BindOk {}))
}
pub fn gen_bind_ok<'a, W: Write + BackToTheBuffer + 'a>(
_: &'a BindOk,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
input = gen_id(31)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct Unbind {
pub destination: ShortString,
pub source: ShortString,
pub routing_key: ShortString,
pub nowait: Boolean,
pub arguments: FieldTable,
}
impl Unbind {
pub fn get_amqp_class_id(&self) -> Identifier {
40
}
pub fn get_amqp_method_id(&self) -> Identifier {
40
}
}
pub fn parse_unbind<I: ParsableInput>(i: I) -> ParserResult<I, Unbind> {
let (i, _) = parse_short_uint(i)?;
let (i, destination) = parse_short_string(i)?;
let (i, source) = parse_short_string(i)?;
let (i, routing_key) = parse_short_string(i)?;
let (i, flags) = parse_flags(i, &["nowait"])?;
let (i, arguments) = parse_field_table(i)?;
Ok((
i,
Unbind {
destination,
source,
routing_key,
nowait: flags.get_flag("nowait").unwrap_or(false),
arguments,
},
))
}
pub fn gen_unbind<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a Unbind,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
let mut flags = AMQPFlags::default();
flags.add_flag("nowait".to_string(), method.nowait);
input = gen_id(40)(input)?;
input = gen_short_uint(0)(input)?;
input = gen_short_string(method.destination.as_str())(input)?;
input = gen_short_string(method.source.as_str())(input)?;
input = gen_short_string(method.routing_key.as_str())(input)?;
input = gen_flags(&flags)(input)?;
input = gen_field_table(&method.arguments)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct UnbindOk {}
impl UnbindOk {
pub fn get_amqp_class_id(&self) -> Identifier {
40
}
pub fn get_amqp_method_id(&self) -> Identifier {
51
}
}
pub fn parse_unbind_ok<I: ParsableInput>(i: I) -> ParserResult<I, UnbindOk> {
Ok((i, UnbindOk {}))
}
pub fn gen_unbind_ok<'a, W: Write + BackToTheBuffer + 'a>(
_: &'a UnbindOk,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
input = gen_id(51)(input)?;
Ok(input)
}
}
}
pub mod queue {
use super::*;
pub fn parse_queue<I: ParsableInput>(i: I) -> ParserResult<I, queue::AMQPMethod> {
context(
"parse_queue",
map_opt(
flat_map(parse_id, |id| {
move |i| match id {
10 => context(
"parse_declare",
map(map(parse_declare, AMQPMethod::Declare), Some),
)(i),
11 => context(
"parse_declare_ok",
map(map(parse_declare_ok, AMQPMethod::DeclareOk), Some),
)(i),
20 => {
context("parse_bind", map(map(parse_bind, AMQPMethod::Bind), Some))(i)
}
21 => context(
"parse_bind_ok",
map(map(parse_bind_ok, AMQPMethod::BindOk), Some),
)(i),
30 => context(
"parse_purge",
map(map(parse_purge, AMQPMethod::Purge), Some),
)(i),
31 => context(
"parse_purge_ok",
map(map(parse_purge_ok, AMQPMethod::PurgeOk), Some),
)(i),
40 => context(
"parse_delete",
map(map(parse_delete, AMQPMethod::Delete), Some),
)(i),
41 => context(
"parse_delete_ok",
map(map(parse_delete_ok, AMQPMethod::DeleteOk), Some),
)(i),
50 => context(
"parse_unbind",
map(map(parse_unbind, AMQPMethod::Unbind), Some),
)(i),
51 => context(
"parse_unbind_ok",
map(map(parse_unbind_ok, AMQPMethod::UnbindOk), Some),
)(i),
_ => Ok((i, None)),
}
}),
std::convert::identity,
),
)(i)
}
pub fn gen_queue<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a AMQPMethod,
) -> impl SerializeFn<W> + 'a {
cookie_factory::sequence::pair(gen_id(50), move |input| match *method {
AMQPMethod::Declare(ref declare) => gen_declare(declare)(input),
AMQPMethod::DeclareOk(ref declare_ok) => gen_declare_ok(declare_ok)(input),
AMQPMethod::Bind(ref bind) => gen_bind(bind)(input),
AMQPMethod::BindOk(ref bind_ok) => gen_bind_ok(bind_ok)(input),
AMQPMethod::Purge(ref purge) => gen_purge(purge)(input),
AMQPMethod::PurgeOk(ref purge_ok) => gen_purge_ok(purge_ok)(input),
AMQPMethod::Delete(ref delete) => gen_delete(delete)(input),
AMQPMethod::DeleteOk(ref delete_ok) => gen_delete_ok(delete_ok)(input),
AMQPMethod::Unbind(ref unbind) => gen_unbind(unbind)(input),
AMQPMethod::UnbindOk(ref unbind_ok) => gen_unbind_ok(unbind_ok)(input),
})
}
#[derive(Clone, Debug, PartialEq)]
pub enum AMQPMethod {
Declare(Declare),
DeclareOk(DeclareOk),
Bind(Bind),
BindOk(BindOk),
Purge(Purge),
PurgeOk(PurgeOk),
Delete(Delete),
DeleteOk(DeleteOk),
Unbind(Unbind),
UnbindOk(UnbindOk),
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct Declare {
pub queue: ShortString,
pub passive: Boolean,
pub durable: Boolean,
pub exclusive: Boolean,
pub auto_delete: Boolean,
pub nowait: Boolean,
pub arguments: FieldTable,
}
impl Declare {
pub fn get_amqp_class_id(&self) -> Identifier {
50
}
pub fn get_amqp_method_id(&self) -> Identifier {
10
}
}
pub fn parse_declare<I: ParsableInput>(i: I) -> ParserResult<I, Declare> {
let (i, _) = parse_short_uint(i)?;
let (i, queue) = parse_short_string(i)?;
let (i, flags) = parse_flags(
i,
&["passive", "durable", "exclusive", "auto-delete", "nowait"],
)?;
let (i, arguments) = parse_field_table(i)?;
Ok((
i,
Declare {
queue,
passive: flags.get_flag("passive").unwrap_or(false),
durable: flags.get_flag("durable").unwrap_or(false),
exclusive: flags.get_flag("exclusive").unwrap_or(false),
auto_delete: flags.get_flag("auto_delete").unwrap_or(false),
nowait: flags.get_flag("nowait").unwrap_or(false),
arguments,
},
))
}
pub fn gen_declare<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a Declare,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
let mut flags = AMQPFlags::default();
flags.add_flag("passive".to_string(), method.passive);
flags.add_flag("durable".to_string(), method.durable);
flags.add_flag("exclusive".to_string(), method.exclusive);
flags.add_flag("auto_delete".to_string(), method.auto_delete);
flags.add_flag("nowait".to_string(), method.nowait);
input = gen_id(10)(input)?;
input = gen_short_uint(0)(input)?;
input = gen_short_string(method.queue.as_str())(input)?;
input = gen_flags(&flags)(input)?;
input = gen_field_table(&method.arguments)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct DeclareOk {
pub queue: ShortString,
pub message_count: LongUInt,
pub consumer_count: LongUInt,
}
impl DeclareOk {
pub fn get_amqp_class_id(&self) -> Identifier {
50
}
pub fn get_amqp_method_id(&self) -> Identifier {
11
}
}
pub fn parse_declare_ok<I: ParsableInput>(i: I) -> ParserResult<I, DeclareOk> {
let (i, queue) = parse_short_string(i)?;
let (i, message_count) = parse_long_uint(i)?;
let (i, consumer_count) = parse_long_uint(i)?;
Ok((
i,
DeclareOk {
queue,
message_count,
consumer_count,
},
))
}
pub fn gen_declare_ok<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a DeclareOk,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
input = gen_id(11)(input)?;
input = gen_short_string(method.queue.as_str())(input)?;
input = gen_long_uint(method.message_count)(input)?;
input = gen_long_uint(method.consumer_count)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct Bind {
pub queue: ShortString,
pub exchange: ShortString,
pub routing_key: ShortString,
pub nowait: Boolean,
pub arguments: FieldTable,
}
impl Bind {
pub fn get_amqp_class_id(&self) -> Identifier {
50
}
pub fn get_amqp_method_id(&self) -> Identifier {
20
}
}
pub fn parse_bind<I: ParsableInput>(i: I) -> ParserResult<I, Bind> {
let (i, _) = parse_short_uint(i)?;
let (i, queue) = parse_short_string(i)?;
let (i, exchange) = parse_short_string(i)?;
let (i, routing_key) = parse_short_string(i)?;
let (i, flags) = parse_flags(i, &["nowait"])?;
let (i, arguments) = parse_field_table(i)?;
Ok((
i,
Bind {
queue,
exchange,
routing_key,
nowait: flags.get_flag("nowait").unwrap_or(false),
arguments,
},
))
}
pub fn gen_bind<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a Bind,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
let mut flags = AMQPFlags::default();
flags.add_flag("nowait".to_string(), method.nowait);
input = gen_id(20)(input)?;
input = gen_short_uint(0)(input)?;
input = gen_short_string(method.queue.as_str())(input)?;
input = gen_short_string(method.exchange.as_str())(input)?;
input = gen_short_string(method.routing_key.as_str())(input)?;
input = gen_flags(&flags)(input)?;
input = gen_field_table(&method.arguments)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct BindOk {}
impl BindOk {
pub fn get_amqp_class_id(&self) -> Identifier {
50
}
pub fn get_amqp_method_id(&self) -> Identifier {
21
}
}
pub fn parse_bind_ok<I: ParsableInput>(i: I) -> ParserResult<I, BindOk> {
Ok((i, BindOk {}))
}
pub fn gen_bind_ok<'a, W: Write + BackToTheBuffer + 'a>(
_: &'a BindOk,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
input = gen_id(21)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct Purge {
pub queue: ShortString,
pub nowait: Boolean,
}
impl Purge {
pub fn get_amqp_class_id(&self) -> Identifier {
50
}
pub fn get_amqp_method_id(&self) -> Identifier {
30
}
}
pub fn parse_purge<I: ParsableInput>(i: I) -> ParserResult<I, Purge> {
let (i, _) = parse_short_uint(i)?;
let (i, queue) = parse_short_string(i)?;
let (i, flags) = parse_flags(i, &["nowait"])?;
Ok((
i,
Purge {
queue,
nowait: flags.get_flag("nowait").unwrap_or(false),
},
))
}
pub fn gen_purge<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a Purge,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
let mut flags = AMQPFlags::default();
flags.add_flag("nowait".to_string(), method.nowait);
input = gen_id(30)(input)?;
input = gen_short_uint(0)(input)?;
input = gen_short_string(method.queue.as_str())(input)?;
input = gen_flags(&flags)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct PurgeOk {
pub message_count: LongUInt,
}
impl PurgeOk {
pub fn get_amqp_class_id(&self) -> Identifier {
50
}
pub fn get_amqp_method_id(&self) -> Identifier {
31
}
}
pub fn parse_purge_ok<I: ParsableInput>(i: I) -> ParserResult<I, PurgeOk> {
let (i, message_count) = parse_long_uint(i)?;
Ok((i, PurgeOk { message_count }))
}
pub fn gen_purge_ok<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a PurgeOk,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
input = gen_id(31)(input)?;
input = gen_long_uint(method.message_count)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct Delete {
pub queue: ShortString,
pub if_unused: Boolean,
pub if_empty: Boolean,
pub nowait: Boolean,
}
impl Delete {
pub fn get_amqp_class_id(&self) -> Identifier {
50
}
pub fn get_amqp_method_id(&self) -> Identifier {
40
}
}
pub fn parse_delete<I: ParsableInput>(i: I) -> ParserResult<I, Delete> {
let (i, _) = parse_short_uint(i)?;
let (i, queue) = parse_short_string(i)?;
let (i, flags) = parse_flags(i, &["if-unused", "if-empty", "nowait"])?;
Ok((
i,
Delete {
queue,
if_unused: flags.get_flag("if_unused").unwrap_or(false),
if_empty: flags.get_flag("if_empty").unwrap_or(false),
nowait: flags.get_flag("nowait").unwrap_or(false),
},
))
}
pub fn gen_delete<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a Delete,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
let mut flags = AMQPFlags::default();
flags.add_flag("if_unused".to_string(), method.if_unused);
flags.add_flag("if_empty".to_string(), method.if_empty);
flags.add_flag("nowait".to_string(), method.nowait);
input = gen_id(40)(input)?;
input = gen_short_uint(0)(input)?;
input = gen_short_string(method.queue.as_str())(input)?;
input = gen_flags(&flags)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct DeleteOk {
pub message_count: LongUInt,
}
impl DeleteOk {
pub fn get_amqp_class_id(&self) -> Identifier {
50
}
pub fn get_amqp_method_id(&self) -> Identifier {
41
}
}
pub fn parse_delete_ok<I: ParsableInput>(i: I) -> ParserResult<I, DeleteOk> {
let (i, message_count) = parse_long_uint(i)?;
Ok((i, DeleteOk { message_count }))
}
pub fn gen_delete_ok<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a DeleteOk,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
input = gen_id(41)(input)?;
input = gen_long_uint(method.message_count)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct Unbind {
pub queue: ShortString,
pub exchange: ShortString,
pub routing_key: ShortString,
pub arguments: FieldTable,
}
impl Unbind {
pub fn get_amqp_class_id(&self) -> Identifier {
50
}
pub fn get_amqp_method_id(&self) -> Identifier {
50
}
}
pub fn parse_unbind<I: ParsableInput>(i: I) -> ParserResult<I, Unbind> {
let (i, _) = parse_short_uint(i)?;
let (i, queue) = parse_short_string(i)?;
let (i, exchange) = parse_short_string(i)?;
let (i, routing_key) = parse_short_string(i)?;
let (i, arguments) = parse_field_table(i)?;
Ok((
i,
Unbind {
queue,
exchange,
routing_key,
arguments,
},
))
}
pub fn gen_unbind<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a Unbind,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
input = gen_id(50)(input)?;
input = gen_short_uint(0)(input)?;
input = gen_short_string(method.queue.as_str())(input)?;
input = gen_short_string(method.exchange.as_str())(input)?;
input = gen_short_string(method.routing_key.as_str())(input)?;
input = gen_field_table(&method.arguments)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct UnbindOk {}
impl UnbindOk {
pub fn get_amqp_class_id(&self) -> Identifier {
50
}
pub fn get_amqp_method_id(&self) -> Identifier {
51
}
}
pub fn parse_unbind_ok<I: ParsableInput>(i: I) -> ParserResult<I, UnbindOk> {
Ok((i, UnbindOk {}))
}
pub fn gen_unbind_ok<'a, W: Write + BackToTheBuffer + 'a>(
_: &'a UnbindOk,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
input = gen_id(51)(input)?;
Ok(input)
}
}
}
pub mod tx {
use super::*;
pub fn parse_tx<I: ParsableInput>(i: I) -> ParserResult<I, tx::AMQPMethod> {
context(
"parse_tx",
map_opt(
flat_map(parse_id, |id| {
move |i| match id {
10 => context(
"parse_select",
map(map(parse_select, AMQPMethod::Select), Some),
)(i),
11 => context(
"parse_select_ok",
map(map(parse_select_ok, AMQPMethod::SelectOk), Some),
)(i),
20 => context(
"parse_commit",
map(map(parse_commit, AMQPMethod::Commit), Some),
)(i),
21 => context(
"parse_commit_ok",
map(map(parse_commit_ok, AMQPMethod::CommitOk), Some),
)(i),
30 => context(
"parse_rollback",
map(map(parse_rollback, AMQPMethod::Rollback), Some),
)(i),
31 => context(
"parse_rollback_ok",
map(map(parse_rollback_ok, AMQPMethod::RollbackOk), Some),
)(i),
_ => Ok((i, None)),
}
}),
std::convert::identity,
),
)(i)
}
pub fn gen_tx<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a AMQPMethod,
) -> impl SerializeFn<W> + 'a {
cookie_factory::sequence::pair(gen_id(90), move |input| match *method {
AMQPMethod::Select(ref select) => gen_select(select)(input),
AMQPMethod::SelectOk(ref select_ok) => gen_select_ok(select_ok)(input),
AMQPMethod::Commit(ref commit) => gen_commit(commit)(input),
AMQPMethod::CommitOk(ref commit_ok) => gen_commit_ok(commit_ok)(input),
AMQPMethod::Rollback(ref rollback) => gen_rollback(rollback)(input),
AMQPMethod::RollbackOk(ref rollback_ok) => gen_rollback_ok(rollback_ok)(input),
})
}
#[derive(Clone, Debug, PartialEq)]
pub enum AMQPMethod {
Select(Select),
SelectOk(SelectOk),
Commit(Commit),
CommitOk(CommitOk),
Rollback(Rollback),
RollbackOk(RollbackOk),
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct Select {}
impl Select {
pub fn get_amqp_class_id(&self) -> Identifier {
90
}
pub fn get_amqp_method_id(&self) -> Identifier {
10
}
}
pub fn parse_select<I: ParsableInput>(i: I) -> ParserResult<I, Select> {
Ok((i, Select {}))
}
pub fn gen_select<'a, W: Write + BackToTheBuffer + 'a>(
_: &'a Select,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
input = gen_id(10)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct SelectOk {}
impl SelectOk {
pub fn get_amqp_class_id(&self) -> Identifier {
90
}
pub fn get_amqp_method_id(&self) -> Identifier {
11
}
}
pub fn parse_select_ok<I: ParsableInput>(i: I) -> ParserResult<I, SelectOk> {
Ok((i, SelectOk {}))
}
pub fn gen_select_ok<'a, W: Write + BackToTheBuffer + 'a>(
_: &'a SelectOk,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
input = gen_id(11)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct Commit {}
impl Commit {
pub fn get_amqp_class_id(&self) -> Identifier {
90
}
pub fn get_amqp_method_id(&self) -> Identifier {
20
}
}
pub fn parse_commit<I: ParsableInput>(i: I) -> ParserResult<I, Commit> {
Ok((i, Commit {}))
}
pub fn gen_commit<'a, W: Write + BackToTheBuffer + 'a>(
_: &'a Commit,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
input = gen_id(20)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct CommitOk {}
impl CommitOk {
pub fn get_amqp_class_id(&self) -> Identifier {
90
}
pub fn get_amqp_method_id(&self) -> Identifier {
21
}
}
pub fn parse_commit_ok<I: ParsableInput>(i: I) -> ParserResult<I, CommitOk> {
Ok((i, CommitOk {}))
}
pub fn gen_commit_ok<'a, W: Write + BackToTheBuffer + 'a>(
_: &'a CommitOk,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
input = gen_id(21)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct Rollback {}
impl Rollback {
pub fn get_amqp_class_id(&self) -> Identifier {
90
}
pub fn get_amqp_method_id(&self) -> Identifier {
30
}
}
pub fn parse_rollback<I: ParsableInput>(i: I) -> ParserResult<I, Rollback> {
Ok((i, Rollback {}))
}
pub fn gen_rollback<'a, W: Write + BackToTheBuffer + 'a>(
_: &'a Rollback,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
input = gen_id(30)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct RollbackOk {}
impl RollbackOk {
pub fn get_amqp_class_id(&self) -> Identifier {
90
}
pub fn get_amqp_method_id(&self) -> Identifier {
31
}
}
pub fn parse_rollback_ok<I: ParsableInput>(i: I) -> ParserResult<I, RollbackOk> {
Ok((i, RollbackOk {}))
}
pub fn gen_rollback_ok<'a, W: Write + BackToTheBuffer + 'a>(
_: &'a RollbackOk,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
input = gen_id(31)(input)?;
Ok(input)
}
}
}
pub mod confirm {
use super::*;
pub fn parse_confirm<I: ParsableInput>(i: I) -> ParserResult<I, confirm::AMQPMethod> {
context(
"parse_confirm",
map_opt(
flat_map(parse_id, |id| {
move |i| match id {
10 => context(
"parse_select",
map(map(parse_select, AMQPMethod::Select), Some),
)(i),
11 => context(
"parse_select_ok",
map(map(parse_select_ok, AMQPMethod::SelectOk), Some),
)(i),
_ => Ok((i, None)),
}
}),
std::convert::identity,
),
)(i)
}
pub fn gen_confirm<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a AMQPMethod,
) -> impl SerializeFn<W> + 'a {
cookie_factory::sequence::pair(gen_id(85), move |input| match *method {
AMQPMethod::Select(ref select) => gen_select(select)(input),
AMQPMethod::SelectOk(ref select_ok) => gen_select_ok(select_ok)(input),
})
}
#[derive(Clone, Debug, PartialEq)]
pub enum AMQPMethod {
Select(Select),
SelectOk(SelectOk),
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct Select {
pub nowait: Boolean,
}
impl Select {
pub fn get_amqp_class_id(&self) -> Identifier {
85
}
pub fn get_amqp_method_id(&self) -> Identifier {
10
}
}
pub fn parse_select<I: ParsableInput>(i: I) -> ParserResult<I, Select> {
let (i, flags) = parse_flags(i, &["nowait"])?;
Ok((
i,
Select {
nowait: flags.get_flag("nowait").unwrap_or(false),
},
))
}
pub fn gen_select<'a, W: Write + BackToTheBuffer + 'a>(
method: &'a Select,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
let mut flags = AMQPFlags::default();
flags.add_flag("nowait".to_string(), method.nowait);
input = gen_id(10)(input)?;
input = gen_flags(&flags)(input)?;
Ok(input)
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct SelectOk {}
impl SelectOk {
pub fn get_amqp_class_id(&self) -> Identifier {
85
}
pub fn get_amqp_method_id(&self) -> Identifier {
11
}
}
pub fn parse_select_ok<I: ParsableInput>(i: I) -> ParserResult<I, SelectOk> {
Ok((i, SelectOk {}))
}
pub fn gen_select_ok<'a, W: Write + BackToTheBuffer + 'a>(
_: &'a SelectOk,
) -> impl SerializeFn<W> + 'a {
move |mut input| {
input = gen_id(11)(input)?;
Ok(input)
}
}
}