[][src]Crate tokio_zmq

Tokio ZMQ, bringing ZeroMQ to the Tokio event loop

This crate provides Streams, Sinks, and Futures for ZeroMQ Sockets, which deal in structures caled Multiparts. Currently, a Multipart is a simple wrapper around VecDeque<zmq::Message>, but in the future this will be represented as a wrapper around VecDeque<S: zmq::Sendable> with the zmq 0.9 release.

Creating a socket

To get a new socket, you must invoke the Socket builder. The Socket Builder can output a 'raw' Socket, or any specific kind of socket, such as Rep, Req, etc. The result of the builder can be any compatable kind of socket, so specifiying a type is important.

Once you have a socket, if it implements StreamSocket, you can use the socket's .stream() and .recv(), if it implements SinkSocket, you can use the socket's .sink(usize) and .send(Multipart).

Without further ado, creating and using a socket:

extern crate zmq;
extern crate futures;
extern crate tokio;
extern crate tokio_zmq;

use std::sync::Arc;

use futures::{Future, Stream};
use tokio_zmq::{prelude::*, Socket, Pub, Sub, Error};

fn run() -> Result<(), Error> {
    // Create a new ZeroMQ Context. This context will be used to create all the sockets.
    let context = Arc::new(zmq::Context::new());

    // Create our two sockets using the Socket builder pattern.
    // Note that the variable is named zpub, since pub is a keyword
    let zpub = Pub::builder(Arc::clone(&context))
        .bind("tcp://*:5561")
        .build();

    let sub = Sub::builder(context)
        .bind("tcp://*:5562")
        .filter(b"")
        .build();

    // Create our simple server. This forwards messages from the Subscriber socket to the
    // Publisher socket, and prints them as they go by.
    let runner = zpub
        .join(sub)
        .and_then(|(zpub, sub)| {
            sub.stream()
                .map(|multipart| {
                    for msg in &multipart {
                        if let Some(msg) = msg.as_str() {
                            println!("Forwarding: {}", msg);
                        }
                    }
                    multipart
                })
                .forward(zpub.sink(25))
        });

    // To avoid an infinte doctest, the actual tokio::run is commented out.
    // tokio::run(runner.map(|_| ()).or_else(|e| {
    //     println!("Error: {}", e);
    // })?;
}

Modules

async_types

This module contains the code that makes Tokio ZMQ Asynchronous. There's the future module, which defines Request and Response futures for ZeroMQ Sockets, the stream module, which defines receiving data from a socket as an asychronous stream, and the sink module, which defines sending data to a socket as an asychronous sink.

prelude

Provide useful types and traits for working with Tokio ZMQ.

Structs

Dealer

The DEALER SocketType wrapper type.

Multipart

This type is used for receiving and sending messages in Multipart groups. An application could make using this easier by implementing traits as follows:

Pair

The PAIR SocketType wrapper type.

Pub

The PUB SocketType wrapper type

Pull

The PULL SocketType wrapper type

Push

The PUSH SocketType wrapper type

Rep

The REP SocketType wrapper type

Req

The REQ SocketType wrapper type

Router

The ROUTER SocketType wrapper type

Socket

Defines the raw Socket type. This type should never be interacted with directly, except to create new instances of wrapper types.

Sub

The SUB SocketType wrapper type

Xpub

The XPUB SocketType wrapper type

Xsub

The XSUB SocketType wrapper type

Enums

Error

Defines the error type for Tokio ZMQ.