use crate::error::{BitcoinError, Result};
use bitcoin::consensus::Decodable;
use bitcoin::{Block, Transaction};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::mpsc;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum ZmqTopic {
HashBlock,
HashTx,
RawBlock,
RawTx,
Sequence,
}
impl ZmqTopic {
pub fn as_str(&self) -> &'static str {
match self {
ZmqTopic::HashBlock => "hashblock",
ZmqTopic::HashTx => "hashtx",
ZmqTopic::RawBlock => "rawblock",
ZmqTopic::RawTx => "rawtx",
ZmqTopic::Sequence => "sequence",
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ZmqConfig {
pub hashblock_endpoint: Option<String>,
pub hashtx_endpoint: Option<String>,
pub rawblock_endpoint: Option<String>,
pub rawtx_endpoint: Option<String>,
pub sequence_endpoint: Option<String>,
pub buffer_size: usize,
pub connection_timeout_ms: u64,
}
impl ZmqConfig {
pub fn new() -> Self {
Self {
hashblock_endpoint: None,
hashtx_endpoint: None,
rawblock_endpoint: None,
rawtx_endpoint: None,
sequence_endpoint: None,
buffer_size: 1000,
connection_timeout_ms: 5000,
}
}
pub fn with_block_endpoint(mut self, endpoint: impl Into<String>) -> Self {
self.hashblock_endpoint = Some(endpoint.into());
self
}
pub fn with_tx_endpoint(mut self, endpoint: impl Into<String>) -> Self {
self.hashtx_endpoint = Some(endpoint.into());
self
}
pub fn with_raw_block_endpoint(mut self, endpoint: impl Into<String>) -> Self {
self.rawblock_endpoint = Some(endpoint.into());
self
}
pub fn with_raw_tx_endpoint(mut self, endpoint: impl Into<String>) -> Self {
self.rawtx_endpoint = Some(endpoint.into());
self
}
pub fn with_sequence_endpoint(mut self, endpoint: impl Into<String>) -> Self {
self.sequence_endpoint = Some(endpoint.into());
self
}
pub fn with_buffer_size(mut self, size: usize) -> Self {
self.buffer_size = size;
self
}
}
impl Default for ZmqConfig {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ZmqNotification {
Block {
hash: String,
sequence: u32,
},
Transaction {
txid: String,
sequence: u32,
},
RawBlock {
hash: String,
block: Vec<u8>,
sequence: u32,
},
RawTransaction {
txid: String,
tx: Vec<u8>,
sequence: u32,
},
Sequence {
event_type: SequenceEvent,
hash: String,
sequence: u32,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum SequenceEvent {
BlockConnected,
BlockDisconnected,
TransactionAdded,
TransactionRemoved,
}
pub struct ZmqClient {
config: Arc<ZmqConfig>,
receiver: mpsc::Receiver<ZmqNotification>,
#[allow(dead_code)]
shutdown_tx: mpsc::Sender<()>,
}
impl ZmqClient {
pub async fn new(config: ZmqConfig) -> Result<Self> {
let (tx, rx) = mpsc::channel(config.buffer_size);
let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
let config_arc = Arc::new(config);
let config_clone = config_arc.clone();
tokio::spawn(async move {
let _ = &config_clone;
let _ = &tx;
let _ = shutdown_rx.recv().await;
});
Ok(Self {
config: config_arc,
receiver: rx,
shutdown_tx,
})
}
pub async fn recv(&mut self) -> Option<ZmqNotification> {
self.receiver.recv().await
}
pub fn try_recv(&mut self) -> Option<ZmqNotification> {
self.receiver.try_recv().ok()
}
pub fn config(&self) -> &ZmqConfig {
&self.config
}
}
#[async_trait::async_trait]
pub trait ZmqHandler: Send + Sync {
async fn on_block(&self, hash: String, sequence: u32) -> Result<()>;
async fn on_transaction(&self, txid: String, sequence: u32) -> Result<()>;
async fn on_raw_block(&self, hash: String, block: Block, sequence: u32) -> Result<()>;
async fn on_raw_transaction(&self, txid: String, tx: Transaction, sequence: u32) -> Result<()>;
async fn on_sequence(&self, event: SequenceEvent, hash: String, sequence: u32) -> Result<()>;
}
pub struct ZmqProcessor {
client: ZmqClient,
handlers: Vec<Arc<dyn ZmqHandler>>,
}
impl ZmqProcessor {
pub fn new(client: ZmqClient) -> Self {
Self {
client,
handlers: Vec::new(),
}
}
pub fn add_handler(&mut self, handler: Arc<dyn ZmqHandler>) {
self.handlers.push(handler);
}
pub async fn run(&mut self) -> Result<()> {
while let Some(notification) = self.client.recv().await {
self.handle_notification(notification).await?;
}
Ok(())
}
async fn handle_notification(&self, notification: ZmqNotification) -> Result<()> {
for handler in &self.handlers {
match ¬ification {
ZmqNotification::Block { hash, sequence } => {
handler.on_block(hash.clone(), *sequence).await?;
}
ZmqNotification::Transaction { txid, sequence } => {
handler.on_transaction(txid.clone(), *sequence).await?;
}
ZmqNotification::RawBlock {
hash,
block,
sequence,
} => {
let parsed_block = Block::consensus_decode(&mut &block[..]).map_err(|e| {
BitcoinError::InvalidTransaction(format!("Failed to parse block: {}", e))
})?;
handler
.on_raw_block(hash.clone(), parsed_block, *sequence)
.await?;
}
ZmqNotification::RawTransaction { txid, tx, sequence } => {
let parsed_tx = Transaction::consensus_decode(&mut &tx[..]).map_err(|e| {
BitcoinError::InvalidTransaction(format!("Failed to parse tx: {}", e))
})?;
handler
.on_raw_transaction(txid.clone(), parsed_tx, *sequence)
.await?;
}
ZmqNotification::Sequence {
event_type,
hash,
sequence,
} => {
handler
.on_sequence(*event_type, hash.clone(), *sequence)
.await?;
}
}
}
Ok(())
}
}
pub struct BlockMonitor {
on_block: Option<Box<dyn Fn(String, u32) + Send + Sync>>,
}
impl BlockMonitor {
pub fn new() -> Self {
Self { on_block: None }
}
pub fn on_block<F>(mut self, callback: F) -> Self
where
F: Fn(String, u32) + Send + Sync + 'static,
{
self.on_block = Some(Box::new(callback));
self
}
}
impl Default for BlockMonitor {
fn default() -> Self {
Self::new()
}
}
#[async_trait::async_trait]
impl ZmqHandler for BlockMonitor {
async fn on_block(&self, hash: String, sequence: u32) -> Result<()> {
if let Some(callback) = &self.on_block {
callback(hash, sequence);
}
Ok(())
}
async fn on_transaction(&self, _txid: String, _sequence: u32) -> Result<()> {
Ok(())
}
async fn on_raw_block(&self, _hash: String, _block: Block, _sequence: u32) -> Result<()> {
Ok(())
}
async fn on_raw_transaction(
&self,
_txid: String,
_tx: Transaction,
_sequence: u32,
) -> Result<()> {
Ok(())
}
async fn on_sequence(
&self,
_event: SequenceEvent,
_hash: String,
_sequence: u32,
) -> Result<()> {
Ok(())
}
}
pub struct TransactionMonitor {
on_tx: Option<Box<dyn Fn(String, u32) + Send + Sync>>,
}
impl TransactionMonitor {
pub fn new() -> Self {
Self { on_tx: None }
}
pub fn on_transaction<F>(mut self, callback: F) -> Self
where
F: Fn(String, u32) + Send + Sync + 'static,
{
self.on_tx = Some(Box::new(callback));
self
}
}
impl Default for TransactionMonitor {
fn default() -> Self {
Self::new()
}
}
#[async_trait::async_trait]
impl ZmqHandler for TransactionMonitor {
async fn on_block(&self, _hash: String, _sequence: u32) -> Result<()> {
Ok(())
}
async fn on_transaction(&self, txid: String, sequence: u32) -> Result<()> {
if let Some(callback) = &self.on_tx {
callback(txid, sequence);
}
Ok(())
}
async fn on_raw_block(&self, _hash: String, _block: Block, _sequence: u32) -> Result<()> {
Ok(())
}
async fn on_raw_transaction(
&self,
_txid: String,
_tx: Transaction,
_sequence: u32,
) -> Result<()> {
Ok(())
}
async fn on_sequence(
&self,
_event: SequenceEvent,
_hash: String,
_sequence: u32,
) -> Result<()> {
Ok(())
}
}
pub struct ReorgDetector {
on_reorg: Option<Box<dyn Fn(String, u32) + Send + Sync>>,
}
impl ReorgDetector {
pub fn new() -> Self {
Self { on_reorg: None }
}
pub fn on_reorg<F>(mut self, callback: F) -> Self
where
F: Fn(String, u32) + Send + Sync + 'static,
{
self.on_reorg = Some(Box::new(callback));
self
}
}
impl Default for ReorgDetector {
fn default() -> Self {
Self::new()
}
}
#[async_trait::async_trait]
impl ZmqHandler for ReorgDetector {
async fn on_block(&self, _hash: String, _sequence: u32) -> Result<()> {
Ok(())
}
async fn on_transaction(&self, _txid: String, _sequence: u32) -> Result<()> {
Ok(())
}
async fn on_raw_block(&self, _hash: String, _block: Block, _sequence: u32) -> Result<()> {
Ok(())
}
async fn on_raw_transaction(
&self,
_txid: String,
_tx: Transaction,
_sequence: u32,
) -> Result<()> {
Ok(())
}
async fn on_sequence(&self, event: SequenceEvent, hash: String, sequence: u32) -> Result<()> {
if event == SequenceEvent::BlockDisconnected {
if let Some(callback) = &self.on_reorg {
callback(hash, sequence);
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_zmq_topic_string() {
assert_eq!(ZmqTopic::HashBlock.as_str(), "hashblock");
assert_eq!(ZmqTopic::HashTx.as_str(), "hashtx");
assert_eq!(ZmqTopic::RawBlock.as_str(), "rawblock");
assert_eq!(ZmqTopic::RawTx.as_str(), "rawtx");
assert_eq!(ZmqTopic::Sequence.as_str(), "sequence");
}
#[test]
fn test_zmq_config_builder() {
let config = ZmqConfig::new()
.with_block_endpoint("tcp://127.0.0.1:28332")
.with_tx_endpoint("tcp://127.0.0.1:28333")
.with_buffer_size(500);
assert_eq!(
config.hashblock_endpoint,
Some("tcp://127.0.0.1:28332".to_string())
);
assert_eq!(
config.hashtx_endpoint,
Some("tcp://127.0.0.1:28333".to_string())
);
assert_eq!(config.buffer_size, 500);
}
#[test]
fn test_sequence_event() {
assert_eq!(SequenceEvent::BlockConnected, SequenceEvent::BlockConnected);
assert_ne!(
SequenceEvent::BlockConnected,
SequenceEvent::BlockDisconnected
);
}
#[test]
fn test_zmq_notification() {
let notification = ZmqNotification::Block {
hash: "abc123".to_string(),
sequence: 100,
};
match notification {
ZmqNotification::Block { hash, sequence } => {
assert_eq!(hash, "abc123");
assert_eq!(sequence, 100);
}
_ => panic!("Wrong notification type"),
}
}
#[test]
fn test_block_monitor_creation() {
let _monitor = BlockMonitor::new().on_block(|hash, seq| {
println!("Block: {} at seq {}", hash, seq);
});
}
#[test]
fn test_transaction_monitor_creation() {
let _monitor = TransactionMonitor::new().on_transaction(|txid, seq| {
println!("Transaction: {} at seq {}", txid, seq);
});
}
#[test]
fn test_reorg_detector_creation() {
let _detector = ReorgDetector::new().on_reorg(|hash, seq| {
println!("Reorg detected: {} at seq {}", hash, seq);
});
}
}