use anyhow::{Context, Result};
use blvm_protocol::wire::{serialize_block, serialize_tx};
use blvm_protocol::{Block, Hash, Transaction};
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{debug, info, warn};
#[cfg(feature = "zmq")]
use zeromq::{Context as ZmqContext, Socket, PUB};
#[cfg(feature = "zmq")]
pub struct ZmqPublisher {
context: Arc<ZmqContext>,
hashblock_socket: Option<Arc<Socket>>,
hashtx_socket: Option<Arc<Socket>>,
rawblock_socket: Option<Arc<Socket>>,
rawtx_socket: Option<Arc<Socket>>,
sequence_socket: Option<Arc<Socket>>,
sequence: Arc<RwLock<u32>>,
}
#[cfg(feature = "zmq")]
impl ZmqPublisher {
pub fn new(config: &ZmqConfig) -> Result<Self> {
let context = Arc::new(ZmqContext::new());
let sequence = Arc::new(RwLock::new(0u32));
let hashblock_socket = if let Some(ref endpoint) = config.hashblock {
Some(Arc::new(Self::create_socket(
&context,
endpoint,
"hashblock",
)?))
} else {
None
};
let hashtx_socket = if let Some(ref endpoint) = config.hashtx {
Some(Arc::new(Self::create_socket(&context, endpoint, "hashtx")?))
} else {
None
};
let rawblock_socket = if let Some(ref endpoint) = config.rawblock {
Some(Arc::new(Self::create_socket(
&context, endpoint, "rawblock",
)?))
} else {
None
};
let rawtx_socket = if let Some(ref endpoint) = config.rawtx {
Some(Arc::new(Self::create_socket(&context, endpoint, "rawtx")?))
} else {
None
};
let sequence_socket = if let Some(ref endpoint) = config.sequence {
Some(Arc::new(Self::create_socket(
&context, endpoint, "sequence",
)?))
} else {
None
};
Ok(Self {
context,
hashblock_socket,
hashtx_socket,
rawblock_socket,
rawtx_socket,
sequence_socket,
sequence,
})
}
fn create_socket(context: &ZmqContext, endpoint: &str, topic: &str) -> Result<Socket> {
let socket = context.socket(PUB)?;
socket
.bind(endpoint)
.with_context(|| format!("Failed to bind ZMQ socket for {topic} to {endpoint}"))?;
info!("ZMQ {} socket bound to {}", topic, endpoint);
Ok(socket)
}
pub async fn publish_hashblock(&self, block_hash: &Hash) -> Result<()> {
if let Some(ref socket) = self.hashblock_socket {
socket.send("hashblock", zeromq::SNDMORE)?;
socket.send(block_hash.as_slice(), 0)?;
debug!("Published hashblock notification: {:?}", block_hash);
}
Ok(())
}
pub async fn publish_hashtx(&self, tx_hash: &Hash) -> Result<()> {
if let Some(ref socket) = self.hashtx_socket {
socket.send("hashtx", zeromq::SNDMORE)?;
socket.send(tx_hash.as_slice(), 0)?;
debug!("Published hashtx notification: {:?}", tx_hash);
}
Ok(())
}
pub async fn publish_rawblock(&self, block: &Block) -> Result<()> {
if let Some(ref socket) = self.rawblock_socket {
let block_data = serialize_block(block).map_err(|e| anyhow::anyhow!("{e}"))?;
socket.send("rawblock", zeromq::SNDMORE)?;
socket.send(&block_data, 0)?;
debug!(
"Published rawblock notification: {} bytes",
block_data.len()
);
}
Ok(())
}
pub async fn publish_rawtx(&self, tx: &Transaction) -> Result<()> {
if let Some(ref socket) = self.rawtx_socket {
let tx_data = serialize_tx(tx).map_err(|e| anyhow::anyhow!("{e}"))?;
socket.send("rawtx", zeromq::SNDMORE)?;
socket.send(&tx_data, 0)?;
debug!("Published rawtx notification: {} bytes", tx_data.len());
}
Ok(())
}
pub async fn publish_sequence(&self, tx_hash: &Hash, is_mempool_entry: bool) -> Result<()> {
if let Some(ref socket) = self.sequence_socket {
let mut seq = self.sequence.write().await;
*seq = seq.wrapping_add(1);
let sequence_num = *seq;
let mut data = Vec::with_capacity(33);
data.push(if is_mempool_entry { 0x01 } else { 0x02 });
data.extend_from_slice(tx_hash.as_slice());
socket.send("sequence", zeromq::SNDMORE)?;
socket.send(&data, 0)?;
debug!(
"Published sequence notification: seq={}, tx={:?}, entry={}",
sequence_num, tx_hash, is_mempool_entry
);
}
Ok(())
}
pub async fn publish_block(&self, block: &Block, block_hash: &Hash) -> Result<()> {
if let Err(e) = self.publish_hashblock(block_hash).await {
warn!("Failed to publish hashblock notification: {}", e);
}
if let Err(e) = self.publish_rawblock(block).await {
warn!("Failed to publish rawblock notification: {}", e);
}
Ok(())
}
pub async fn publish_transaction(
&self,
tx: &Transaction,
tx_hash: &Hash,
is_mempool_entry: bool,
) -> Result<()> {
if let Err(e) = self.publish_hashtx(tx_hash).await {
warn!("Failed to publish hashtx notification: {}", e);
}
if let Err(e) = self.publish_rawtx(tx).await {
warn!("Failed to publish rawtx notification: {}", e);
}
if let Err(e) = self.publish_sequence(tx_hash, is_mempool_entry).await {
warn!("Failed to publish sequence notification: {}", e);
}
Ok(())
}
}
#[derive(Debug, Clone, Default)]
#[cfg_attr(feature = "zmq", derive(serde::Serialize, serde::Deserialize))]
pub struct ZmqConfig {
pub hashblock: Option<String>,
pub hashtx: Option<String>,
pub rawblock: Option<String>,
pub rawtx: Option<String>,
pub sequence: Option<String>,
}
impl ZmqConfig {
pub fn is_enabled(&self) -> bool {
self.hashblock.is_some()
|| self.hashtx.is_some()
|| self.rawblock.is_some()
|| self.rawtx.is_some()
|| self.sequence.is_some()
}
}
#[cfg(not(feature = "zmq"))]
pub struct ZmqPublisher;
#[cfg(not(feature = "zmq"))]
impl ZmqPublisher {
pub fn new(_config: &ZmqConfig) -> Result<Self> {
Ok(Self)
}
pub async fn publish_hashblock(&self, _block_hash: &Hash) -> Result<()> {
Ok(())
}
pub async fn publish_hashtx(&self, _tx_hash: &Hash) -> Result<()> {
Ok(())
}
pub async fn publish_rawblock(&self, _block: &Block) -> Result<()> {
Ok(())
}
pub async fn publish_rawtx(&self, _tx: &Transaction) -> Result<()> {
Ok(())
}
pub async fn publish_sequence(&self, _tx_hash: &Hash, _is_mempool_entry: bool) -> Result<()> {
Ok(())
}
pub async fn publish_block(&self, _block: &Block, _block_hash: &Hash) -> Result<()> {
Ok(())
}
pub async fn publish_transaction(
&self,
_tx: &Transaction,
_tx_hash: &Hash,
_is_mempool_entry: bool,
) -> Result<()> {
Ok(())
}
}