amqp 0.0.10

AMQP/RabbitMQ protocol client
use amqp_error::{AMQPResult, AMQPError};
use std::sync::mpsc::Receiver;

use framing::{ContentHeaderFrame, Frame, FrameType};
use table::Table;
use protocol;
use protocol::{MethodFrame, channel, basic};
use protocol::basic::BasicProperties;
use connection::Connection;
use std::collections::HashMap;
use std::cell::RefCell;
use std::rc::Rc;

pub trait Consumer : Send {
    fn handle_delivery(&mut self, channel: &mut Channel, method: basic::Deliver, headers: BasicProperties, body: Vec<u8>);
}

pub type ConsumerCallBackFn = fn(channel: &mut Channel, method: basic::Deliver, headers: BasicProperties, body: Vec<u8>);

impl Consumer for ConsumerCallBackFn {
    fn handle_delivery(&mut self, channel: &mut Channel, method: basic::Deliver, headers: BasicProperties, body: Vec<u8>) {
        self(channel, method, headers, body);
    }
}

pub struct Channel {
    pub id: u16,
    pub consumers: Rc<RefCell<HashMap<String, Box<Consumer>>>>,
    receiver: Receiver<AMQPResult<Frame>>,
    pub connection: Connection //TODO: Make private
}

unsafe impl Send for Channel {}

impl Channel {
    pub fn new(id: u16, receiver: Receiver<AMQPResult<Frame>>, connection: Connection) -> Channel {
        Channel{ id: id, receiver: receiver, consumers: Rc::new(RefCell::new(HashMap::new())), connection: connection }
    }

    pub fn open(&mut self) -> AMQPResult<protocol::channel::OpenOk> {
        let meth = protocol::channel::Open {out_of_band: "".to_string()};
        self.rpc(&meth, "channel.open-ok")
    }
    pub fn close(&mut self, reply_code: u16, reply_text: String) -> AMQPResult<channel::CloseOk> {
        let close = &channel::Close {reply_code: reply_code, reply_text: reply_text, class_id: 0, method_id: 0};
        self.rpc(close, "channel.close-ok")
    }

    pub fn read(&self) -> AMQPResult<Frame> {
        self.receiver.recv().map_err(|_|
            AMQPError::Protocol("Error reading packet from channel".to_owned())
        ).and_then(|frame| frame)
    }

    pub fn write(&mut self, frame: Frame) -> AMQPResult<()> {
        self.connection.write(frame)
    }

    pub fn send_method_frame<T>(&mut self, method: &T) -> AMQPResult<()> where T: protocol::Method {
        debug!("Sending method {} to channel {}", method.name(), self.id);
        let id = self.id;
        self.write(Frame {frame_type: FrameType::METHOD, channel: id, payload: try!(MethodFrame::encode_method(method)) })
    }

    pub fn rpc<T, U>(&mut self, method: &U, expected_reply: &str) -> AMQPResult<T> where T: protocol::Method, U: protocol::Method {
        let method_frame = try!(self.raw_rpc(method));
        match method_frame.method_name() {
            m_name if m_name == expected_reply => protocol::Method::decode(method_frame),
            m_name => Err(AMQPError::Protocol(format!("Unexpected method frame: {}, expected: {}", m_name, expected_reply)))
        }
    }

    pub fn raw_rpc<T>(&mut self, method: &T) -> AMQPResult<MethodFrame>  where T: protocol::Method {
        try!(self.send_method_frame(method));
        MethodFrame::decode(try!(self.read()))
    }

    pub fn read_headers(&mut self) -> AMQPResult<ContentHeaderFrame> {
        ContentHeaderFrame::decode(try!(self.read()))
    }

    pub fn read_body(&mut self, size: u64) -> AMQPResult<Vec<u8>> {
        let mut body = Vec::with_capacity(size as usize);
        while body.len() < size as usize {
            body.extend(try!(self.read()).payload.into_iter())
        }
        Ok(body)
    }

    pub fn exchange_declare(&mut self, exchange: &str, _type: &str, passive: bool, durable: bool,
        auto_delete: bool, internal: bool, nowait: bool, arguments: Table) -> AMQPResult<protocol::exchange::DeclareOk> {
        let declare = protocol::exchange::Declare {
            ticket: 0, exchange: exchange.to_string(), _type: _type.to_string(), passive: passive, durable: durable,
            auto_delete: auto_delete, internal: internal, nowait: nowait, arguments: arguments
        };
        self.rpc(&declare,"exchange.declare-ok")
    }

    pub fn exchange_bind(&mut self, destination: &str, source: &str,
                         routing_key: &str, arguments: Table) -> AMQPResult<protocol::exchange::BindOk> {
        let bind = protocol::exchange::Bind {
            ticket: 0, destination: destination.to_string(), source: source.to_string(),
            routing_key:routing_key.to_string(), nowait: false, arguments: arguments
        };
        self.rpc(&bind, "exchange.bind-ok")
    }

    pub fn queue_declare(&mut self, queue: &str, passive: bool, durable: bool, exclusive: bool,
        auto_delete: bool, nowait: bool, arguments: Table) -> AMQPResult<protocol::queue::DeclareOk> {
        let declare = protocol::queue::Declare {
            ticket: 0, queue: queue.to_string(), passive: passive, durable: durable, exclusive: exclusive,
            auto_delete: auto_delete, nowait: nowait, arguments: arguments
        };
        self.rpc(&declare, "queue.declare-ok")
    }

    pub fn queue_bind(&mut self, queue: &str, exchange: &str, routing_key: &str, nowait: bool, arguments: Table) -> AMQPResult<protocol::queue::BindOk> {
        let bind = protocol::queue::Bind {
            ticket: 0, queue: queue.to_string(), exchange: exchange.to_string(),
            routing_key: routing_key.to_string(), nowait: nowait, arguments: arguments
        };
        self.rpc(&bind, "queue.bind-ok")
    }
}