Expand description
Futures ZMQ, bringing ZeroMQ any futures runtime.
Futures ZMQ contains wrappers around ZeroMQ Concepts with Futures. It shares an external API with tokio-zmq, but unlike tokio-zmq, futures-zmq is OS and Executor agnostic. This comes at the cost of performance, as futures-zmq relies on spinning up a separate thread for managing the ZeroMQ sockets, while tokio-zmq can avoid this issue by letting mio manage the sockets.
This crate provides Streams, Sinks, and Futures for ZeroMQ Sockets, which deal in structures
caled Multiparts. Currently, a Multipart is a simple wrapper around VecDeque<zmq::Message>
,
but in the future this will be represented as a wrapper around VecDeque<S: zmq::Sendable>
with the zmq 0.9 release.
§Creating a socket
To get a new socket, you must invoke the Socket builder. The Socket Builder can output a ‘raw’ Socket, or any specific kind of socket, such as Rep, Req, etc. The result of the builder can be any compatable kind of socket, so specifiying a type is important.
Once you have a socket, if it implements StreamSocket
, you can use the socket’s .stream()
and .recv()
, if it implements SinkSocket
, you can use the socket’s .sink(usize)
and
.send(Multipart)
.
Without further ado, creating and using a socket:
extern crate zmq;
extern crate futures;
extern crate tokio;
extern crate futures_zmq;
use std::sync::Arc;
use futures::{Future, Stream};
use futures_zmq::{prelude::*, Socket, Pub, Sub, Error};
fn run() -> Result<(), Error> {
// Create a new ZeroMQ Context. This context will be used to create all the sockets.
let context = Arc::new(zmq::Context::new());
// Create our two sockets using the Socket builder pattern.
// Note that the variable is named zpub, since pub is a keyword
let zpub = Pub::builder(Arc::clone(&context))
.bind("tcp://*:5561")
.build();
let sub = Sub::builder(context)
.bind("tcp://*:5562")
.filter(b"")
.build();
// Create our simple server. This forwards messages from the Subscriber socket to the
// Publisher socket, and prints them as they go by.
let runner = zpub
.join(sub)
.and_then(|(zpub, sub)| {
sub.stream()
.map(|multipart| {
for msg in &multipart {
if let Some(msg) = msg.as_str() {
println!("Forwarding: {}", msg);
}
}
multipart
})
.forward(zpub.sink(25))
});
// To avoid an infinte doctest, the actual tokio::run is commented out.
// tokio::run(runner.map(|_| ()).or_else(|e| {
// println!("Error: {}", e);
// })?;
}
Re-exports§
pub use self::error::Error;
Modules§
- async_
types - error
- prelude
- Provide useful types and traits for working with Futures ZMQ.
Structs§
- Dealer
- The DEALER
SocketType
wrapper type. - Multipart
- This type is used for receiving and sending messages in Multipart groups. An application could make using this easier by implementing traits as follows:
- Pair
- The PAIR
SocketType
wrapper type. - Pub
- The PUB
SocketType
wrapper type - Pull
- The PULL
SocketType
wrapper type - Push
- The PUSH
SocketType
wrapper type - Recv
Future - Rep
- The REP
SocketType
wrapper type - Req
- The REQ
SocketType
wrapper type - Router
- The ROUTER
SocketType
wrapper type - SESSION
- Send
Future - Session
- Socket
- Defines the raw Socket type. This type should never be interacted with directly, except to create new instances of wrapper types.
- Sub
- The SUB
SocketType
wrapper type - Xpub
- The XPUB
SocketType
wrapper type - Xsub
- The XSUB
SocketType
wrapper type