amqpr-api 0.4.1

A tokio future based amqp api library
Documentation
extern crate amqpr_api;
extern crate amqpr_codec;
extern crate tokio_core;
extern crate futures;
extern crate bytes;
extern crate log;
extern crate log4rs;

use tokio_core::reactor::{Core, Interval};
use tokio_core::net::TcpStream;
use futures::{Future, Stream, Sink};

use bytes::Bytes;

use amqpr_codec::content_header::Properties;

use amqpr_api::{start_handshake, declare_exchange, open_channel, publish_sink};
use amqpr_api::exchange::declare::{ExchangeType, DeclareExchangeOption};
use amqpr_api::basic::publish::{PublishOption, PublishItem};
use amqpr_api::handshake::SimpleHandshaker;
use amqpr_api::errors::*;


const LOCAL_CHANNEL_ID: u16 = 42;
const EXCHANGE_NAME: &'static str = "example";

fn main() {
    logger();

    let mut core = Core::new().unwrap();

    let handshaker = SimpleHandshaker {
        user: "guest".into(),
        pass: "guest".into(),
        virtual_host: "/".into(),
    };

    let future = TcpStream::connect(&"127.0.0.1:5672".parse().unwrap(), &core.handle())
        .map_err(|e| Error::from(e))
        .and_then(|socket| start_handshake(handshaker, socket))
        .and_then(|socket| open_channel(LOCAL_CHANNEL_ID, socket))
        .and_then(|socket| {
            let option = DeclareExchangeOption {
                name: EXCHANGE_NAME.into(),
                typ: ExchangeType::Fanout,
                is_passive: false,
                is_durable: false,
                is_auto_delete: true,
                is_internal: false,
            };
            declare_exchange(LOCAL_CHANNEL_ID, socket, option)
        })
        .map(move |socket| publish_sink(LOCAL_CHANNEL_ID, socket));

    let sink = core.run(future).unwrap();

    let stream = {
        let interval = Interval::new(::std::time::Duration::new(1, 0), &core.handle()).unwrap();
        interval.map(|()| publish_item())
    };

    let _ = core.run(sink.send_all(stream));
}


fn publish_item() -> PublishItem {
    let option = PublishOption {
        exchange: EXCHANGE_NAME.into(),
        routing_key: "".into(),
        is_mandatory: false,
        is_immediate: false,
    };

    PublishItem {
        meta: option,
        header: Properties::new(),
        body: Bytes::from_static(b"amqpr-api example"),
    }
}


fn logger() {
    use log::LogLevelFilter;
    use log4rs::append::console::ConsoleAppender;
    use log4rs::config::{Appender, Config, Root};
    let stdout = ConsoleAppender::builder().build();

    let config = Config::builder()
        .appender(Appender::builder().build("stdout", Box::new(stdout)))
        .build(Root::builder().appender("stdout").build(
            LogLevelFilter::Info,
        ))
        .unwrap();

    log4rs::init_config(config).unwrap();
}