unicorn-rpc 0.0.1

Internal communication distribution abstraction for unicorn.
Documentation
//! Internal communication distribution abstraction for `unicorn`.

extern crate nanomsg;
extern crate unicorn_messages;

pub use nanomsg::Protocol;

use nanomsg::{Socket, Endpoint, Result};
use unicorn_messages::{Msg, encode_bytes, decode};
use std::io::Read;

pub enum RPCType {
    Connect,
    Bind
}

/// Performs RPC-like data exchange over `nanomsg::Protocol::Rep` sockets.
pub struct RPC {
    socket: Socket,
    endpoint: Endpoint,
    socket_type: Protocol
}

impl RPC {
    fn create_socket(socktype: Protocol) -> Socket {
        match Socket::new(socktype) {
            Ok(s) => s,
            Err(e) => panic!("[rpc] Error creating {:?} socket. Error: {}", socktype, e),
        }
    }

    fn create_device_socket(socktype: Protocol) -> Socket {
        match Socket::new_for_device(socktype) {
            Ok(s) => s,
            Err(e) => panic!("[rpc] Error creating device socket for {:?}. Error: {}", socktype, e),
        }
    }

    fn connect(mut socket: Socket, addr: &str) -> (Socket, Endpoint) {
        let endpoint = match socket.connect(addr.as_ref()) {
            Ok(ce) => {
                ce
            }
            Err(e) => panic!("[rpc] Unable to connect to remote Rep endpoint: {}\nError: {}", &addr, e),
        };
        (socket, endpoint)
    }

    fn bind(mut socket: Socket, addr: &str) -> (Socket, Endpoint) {
        let endpoint = match socket.bind(addr.as_ref()) {
            Ok(ce) => {
                ce
            }
            Err(e) => panic!("[rpc] Unable to bind Rep endpoint: {}\nError: {}", &addr, e),
        };
        (socket, endpoint)
    }

    fn generate(socket: Socket, sock_type: Protocol, conn_type: RPCType, addr: &str) -> RPC {
        let (socket, endpoint) = match conn_type {
            RPCType::Connect => RPC::connect(socket, addr),
            RPCType::Bind => RPC::bind(socket, addr),
        };
        RPC {
            socket: socket,
            endpoint: endpoint,
            socket_type: sock_type
        }
    }

    /// Link two sockets using a `nanomsg`
    /// [`device`](http://nanomsg.org/v0.8/nn_device.3.HTML).
    pub fn link(&self, s2: &RPC) -> Result<()> {
        Ok(try!(Socket::device(&self.socket, &s2.socket)))
    }

    /// Create a new `RPC` instance
    pub fn new(sock_type: Protocol, conn_type: RPCType, addr: &str) -> RPC {
        let socket = RPC::create_socket(sock_type);
        RPC::generate(socket, sock_type, conn_type, addr)
    }

    /// Create a new `RPC` instance for use in `nanomsg` `device`
    pub fn new_for_device(sock_type: Protocol, conn_type: RPCType, addr: &str) -> RPC {
        let socket = RPC::create_device_socket(sock_type);
        RPC::generate(socket, sock_type, conn_type, addr)
    }

    /// Run a remote command by sending a `message` to remote socket
    /// and reading a reply for the operation from the remote socket.
    pub fn execute(&mut self, m: Msg) -> Result<Msg> {
        try!(self.send_msg(&m));
        let mut buff = String::new();
        try!(self.recv(&mut buff));
        match decode(&buff) {
            Some(msg) => Ok(msg),
            None => Ok(Msg::Error("Unable to decode message received from remote endpoint".to_string())),
        }
    }

    /// Receive from socket and write to given String
    pub fn recv(&mut self, buff: &mut String) -> Result<()> {
        try!(self.socket.read_to_string(buff));
        Ok(())
    }

    /// Send `Msg` on socket, after encoding `Msg` to byte vector.
    pub fn send_msg(&mut self, m: &Msg) -> Result<()> {
        try!(self.send(encode_bytes(m)));
        Ok(())
    }

    /// Send byte vector on socket
    pub fn send(&mut self, v: Vec<u8>) -> Result<()> {
        try!(self.socket.nb_write(v.as_slice()));
        Ok(())
    }

    /// Shutdown endpoint
    pub fn close(mut self) -> Result<()> {
        try!(self.endpoint.shutdown());
        Ok(())
    }

    /// Return socket type
    pub fn get_socket_type(&self) -> Protocol {
        self.socket_type
    }
}