pub mod errors;
use std::sync::Arc;
use futures::{
compat::{Future01CompatExt, Stream01CompatExt},
prelude::*,
};
use futures_zmq::{prelude::*, Sub};
use zmq::Context;
pub use errors::*;
#[derive(Clone, PartialEq)]
pub enum Topic {
RawTx,
HashTx,
RawBlock,
HashBlock,
}
pub struct ZMQListener {
subscriber: Sub,
}
impl ZMQListener {
pub async fn bind(addr: &str) -> Result<Self, ZMQError> {
let context = Arc::new(Context::new());
let sub_old = Sub::builder(context).connect(addr).filter(b"").build();
let subscriber = sub_old.compat().await?; let listener = ZMQListener { subscriber };
Ok(listener)
}
pub fn stream(self) -> impl Stream<Item = Result<Vec<u8>, SubscriptionError>> {
let stream = self.subscriber.stream().compat().map(move |multipart_res| {
let mut multipart = multipart_res?;
multipart.pop_front().ok_or(BitcoinError::MissingTopic)?;
let payload = multipart.pop_front().ok_or(BitcoinError::MissingPayload)?;
Ok(payload.to_vec())
});
Box::pin(stream)
}
pub fn stream_classified(
self,
) -> impl Stream<Item = Result<(Topic, Vec<u8>), SubscriptionError>> {
let stream = self.subscriber.stream().compat().map(move |multipart_res| {
let mut multipart = multipart_res?;
let raw_topic = multipart.pop_front().ok_or(BitcoinError::MissingTopic)?;
let payload = multipart.pop_front().ok_or(BitcoinError::MissingPayload)?;
let topic = match &*raw_topic {
b"rawtx" => Topic::RawTx,
b"hashtx" => Topic::HashTx,
b"rawblock" => Topic::RawBlock,
b"hashblock" => Topic::HashBlock,
_ => return Err(BitcoinError::UnexpectedTopic.into()),
};
Ok((topic, payload.to_vec()))
});
Box::pin(stream)
}
}