Crate tokio_zmq[−][src]
Tokio ZMQ, bringing ZeroMQ to the Tokio event loop
This crate provides Streams, Sinks, and Futures for ZeroMQ Sockets, which deal in structures
caled Multiparts. Currently, a Multipart is a simple wrapper around VecDeque<zmq::Message>
,
but in the future this will be represented as a wrapper around VecDeque<S: zmq::Sendable>
with the zmq 0.9 release.
Creating a socket
To get a new socket, you must invoke the Socket builder. The Socket Builder can output a 'raw' Socket, or any specific kind of socket, such as Rep, Req, etc. The result of the builder can be any compatable kind of socket, so specifiying a type is important.
Once you have a socket, if it implements StreamSocket
, you can use the socket's .stream()
, if
it implements SinkSocket
, you can use the socket's .sink()
, and if it implements
FutureSocket
, you can use the send
and recv
methods.
Without further ado, creating and using a socket:
#![feature(try_from)] extern crate zmq; extern crate futures; extern crate tokio; extern crate tokio_zmq; use std::{convert::TryInto, sync::Arc}; use futures::{Future, Stream}; use tokio_zmq::{prelude::*, Socket, Pub, Sub, Error}; fn run() -> Result<(), Error> { // Create a new ZeroMQ Context. This context will be used to create all the sockets. let context = Arc::new(zmq::Context::new()); // Create our two sockets using the Socket builder pattern. // Note that the variable is named zpub, since pub is a keyword let zpub: Pub = Socket::builder(Arc::clone(&context)) .bind("tcp://*:5561") .try_into()?; let sub: Sub = Socket::builder(context) .bind("tcp://*:5562") .filter(b"") .try_into()?; // Create our simple server. This forwards messages from the Subscriber socket to the // Publisher socket, and prints them as they go by. let runner = sub.stream() .map(|multipart| { for msg in &multipart { if let Some(msg) = msg.as_str() { println!("Forwarding: {}", msg); } } multipart }) .forward(zpub.sink()); // To avoid an infinte doctest, the actual core.run is commented out. // tokio::runtime::run2(runner.map(|_| ()).or_else(|e| { // println!("Error: {}", e); // })?; }
Re-exports
pub use self::socket::types::Dealer; |
pub use self::socket::types::Pair; |
pub use self::socket::types::Pub; |
pub use self::socket::types::Pull; |
pub use self::socket::types::Push; |
pub use self::socket::types::Rep; |
pub use self::socket::types::Req; |
pub use self::socket::types::Router; |
pub use self::socket::types::Sub; |
pub use self::socket::types::Xpub; |
pub use self::socket::types::Xsub; |
pub use self::socket::Socket; |
Modules
async |
This module contains the code that makes Tokio ZMQ Asynchronous. There's the |
file |
This module contains definitions for the |
prelude |
Provide useful types and traits for working with Tokio ZMQ. |
socket |
This module contains useful traits and types for working with ZeroMQ Sockets. |
Structs
Multipart |
This type is used for receiving and sending messages in Multipart groups. An application could make using this easier by implementing traits as follows: |
Enums
Error |
Defines the error type for Tokio ZMQ. |