Crate tokio_zmq [] [src]

Tokio ZMQ, bringing Zero MQ to the Tokio event loop

This crate provides Streams, Sinks, and Futures for Zero MQ Sockets, which deal in structures caled Multiparts. Currently, a Multipart is a simple VecDequezmq::Message, but possibly in the future this can be represented as a struct, or VecDeque with the zmq 0.9 release.

Creating a socket

To get a new socket, you must invoke the Socket builder. The Socket Builder can output a 'raw' Socket, or any specific kind of socket, such as Rep, Req, etc. The result of the builder can be any compatable kind of socket, so specifiying a type is important.

Once you have a socket, if it implements StreamSocket, you can use the socket's .stream(), if it implements SinkSocket, you can use the socket's .sink(), and if it implements FutureSocket, you can use the send and recv methods.

Without further ado, creating and using a socket:

#![feature(try_from)]

extern crate zmq;
extern crate futures;
extern crate tokio_core;
extern crate tokio_zmq;

use std::convert::TryInto;
use std::rc::Rc;

use futures::Stream;
use tokio_core::reactor::Core;
use tokio_zmq::prelude::*;
use tokio_zmq::{Socket, Pub, Sub, Error};

fn run() -> Result<(), Error> {
    // Create a new Event Loop. Typically this will happen somewhere near the start of your
    // application.
    let mut core = Core::new()?;

    // Create a new ZeroMQ Context. This context will be used to create all the sockets.
    let context = Rc::new(zmq::Context::new());

    // Create our two sockets using the Socket builder pattern.
    // Note that the variable is named zpub, since pub is a keyword
    let zpub: Pub = Socket::new(Rc::clone(&context), core.handle())
        .bind("tcp://*:5561")
        .try_into()?;

    let sub: Sub = Socket::new(context, core.handle())
        .bind("tcp://*:5562")
        .filter(b"")
        .try_into()?;

    // Create our simple server. This forwards messages from the Subscriber socket to the
    // Publisher socket, and prints them as they go by.
    let runner = sub.stream()
        .map(|multipart| {
            for msg in &multipart {
                if let Some(msg) = msg.as_str() {
                    println!("Forwarding: {}", msg);
                }
            }
            multipart
        })
        .forward(zpub.sink::<Error>());

    // To avoid an infinte doctest, the actual core.run is commented out.
    // core.run(runner)?;
}

Reexports

pub use async::ControlHandler;
pub use self::error::Error;
pub use socket::Socket;
pub use socket::Rep;
pub use socket::Req;
pub use socket::Pub;
pub use socket::Sub;
pub use socket::Push;
pub use socket::Pull;
pub use socket::Xpub;
pub use socket::Xsub;
pub use socket::Pair;
pub use socket::RepControlled;
pub use socket::SubControlled;
pub use socket::PullControlled;
pub use socket::XpubControlled;
pub use socket::XsubControlled;
pub use socket::PairControlled;

Modules

async

This module contains the code that makes Tokio ZMQ Asynchronous. There's the future module, which defines Request and Response futures for ZeroMQ Sockets, the stream module, which defines receiving data from a socket as an asychronous stream, and the sink module, which defines sending data to a socket as an asychronous sink.

error
file
prelude

Re-export all important traits to make developing with this library easier

socket

This module contains useful traits and types for working with ZeroMQ Sockets.