1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
//! STREAM socket module of Pub-Sub pattern in ZMQ
//!
//! Use the [`stream`] function to instantiate a stream socket and use methods
//! from the [`Stream`]/[`StreamExt`] traits.
//!
//! A stream socket is for interacting with TCP data from sources other than
//! ZMQ. It is not compatible with any of the other ZMQ socket types.
//!
//! # Example
//!
//! ```no_run
//! use async_zmq::{Result, StreamExt};
//!
//! #[async_std::main]
//! async fn main() -> Result<()> {
//! let mut zmq = async_zmq::stream("tcp://127.0.0.1:5555")?.bind()?;
//!
//! while let Some(msg) = zmq.next().await {
//! // Received message is a type of Result<MessageBuf>
//! let msg = msg?;
//!
//! println!("{:?}", msg.iter());
//! }
//! Ok(())
//! }
//! ```
//!
//! [`stream`]: fn.stream.html
//! [`Stream`]: ../trait.Stream.html
//! [`StreamExt`]: ../trait.StreamExt.html
use std::pin::Pin;
use std::task::{Context, Poll};
use zmq::SocketType;
use crate::{
reactor::{AsRawSocket, ZmqSocket},
socket::{Multipart, Receiver, SocketBuilder},
RecvError, SocketError, Stream,
};
/// Create a ZMQ socket with STREAM type
pub fn stream(endpoint: &str) -> Result<SocketBuilder<'_, ZmqStream>, SocketError> {
Ok(SocketBuilder::new(SocketType::STREAM, endpoint))
}
/// The async wrapper of ZMQ socket with STREAM type
pub struct ZmqStream(Receiver);
impl From<zmq::Socket> for ZmqStream {
fn from(socket: zmq::Socket) -> Self {
Self(Receiver {
socket: ZmqSocket::from(socket),
})
}
}
impl Stream for ZmqStream {
type Item = Result<Multipart, RecvError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.get_mut().0)
.poll_next(cx)
.map(|poll| poll.map(|result| result.map_err(Into::into)))
}
}
impl ZmqStream {
/// Represent as `Socket` from zmq crate in case you want to call its methods.
pub fn as_raw_socket(&self) -> &zmq::Socket {
self.0.socket.as_socket()
}
}