#[cfg(all(test, feature = "zmq"))]
mod zmq_tests {
use crate::zmq::{ZmqConfig, ZmqPublisher};
use blvm_protocol::{Block, Hash, Transaction};
use std::time::Duration;
use tokio::time::timeout;
use zeromq::{Context, SUB};
fn create_test_block() -> Block {
use blvm_protocol::BlockHeader;
Block {
header: BlockHeader {
version: 1,
prev_block_hash: [0u8; 32],
merkle_root: [0u8; 32],
timestamp: 1234567890,
bits: 0x1d00ffff,
nonce: 0,
},
transactions: vec![].into_boxed_slice(),
}
}
fn create_test_transaction() -> Transaction {
use blvm_protocol::{tx_inputs, tx_outputs};
Transaction {
version: 1,
inputs: tx_inputs![],
outputs: tx_outputs![],
lock_time: 0,
}
}
#[tokio::test]
async fn test_zmq_publisher_creation() {
let config = ZmqConfig {
hashblock: Some("tcp://127.0.0.1:28332".to_string()),
hashtx: None,
rawblock: None,
rawtx: None,
sequence: None,
};
let publisher = ZmqPublisher::new(&config);
assert!(publisher.is_ok());
}
#[tokio::test]
async fn test_zmq_hashblock_notification() {
let config = ZmqConfig {
hashblock: Some("tcp://127.0.0.1:28333".to_string()),
hashtx: None,
rawblock: None,
rawtx: None,
sequence: None,
};
let publisher = ZmqPublisher::new(&config).unwrap();
let ctx = Context::new();
let subscriber = ctx.socket(SUB).unwrap();
subscriber.connect("tcp://127.0.0.1:28333").unwrap();
subscriber.set_subscribe(b"hashblock").unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
let block_hash: Hash = [1u8; 32];
publisher.publish_hashblock(&block_hash).await.unwrap();
let result = timeout(Duration::from_secs(1), async {
let topic = subscriber.recv_msg(0).unwrap();
let data = subscriber.recv_msg(0).unwrap();
(topic, data)
})
.await;
assert!(result.is_ok());
let (topic, data) = result.unwrap();
assert_eq!(topic.as_str(), Some("hashblock"));
assert_eq!(data.len(), 32);
assert_eq!(data.as_ref() as &[u8], block_hash.as_slice());
}
#[tokio::test]
async fn test_zmq_hashtx_notification() {
let config = ZmqConfig {
hashblock: None,
hashtx: Some("tcp://127.0.0.1:28334".to_string()),
rawblock: None,
rawtx: None,
sequence: None,
};
let publisher = ZmqPublisher::new(&config).unwrap();
let ctx = Context::new();
let subscriber = ctx.socket(SUB).unwrap();
subscriber.connect("tcp://127.0.0.1:28334").unwrap();
subscriber.set_subscribe(b"hashtx").unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
let tx_hash: Hash = [2u8; 32];
publisher.publish_hashtx(&tx_hash).await.unwrap();
let result = timeout(Duration::from_secs(1), async {
let topic = subscriber.recv_msg(0).unwrap();
let data = subscriber.recv_msg(0).unwrap();
(topic, data)
})
.await;
assert!(result.is_ok());
let (topic, data) = result.unwrap();
assert_eq!(topic.as_str(), Some("hashtx"));
assert_eq!(data.len(), 32);
assert_eq!(data.as_ref() as &[u8], tx_hash.as_slice());
}
#[tokio::test]
async fn test_zmq_rawblock_notification() {
let config = ZmqConfig {
hashblock: None,
hashtx: None,
rawblock: Some("tcp://127.0.0.1:28335".to_string()),
rawtx: None,
sequence: None,
};
let publisher = ZmqPublisher::new(&config).unwrap();
let block = create_test_block();
let ctx = Context::new();
let subscriber = ctx.socket(SUB).unwrap();
subscriber.connect("tcp://127.0.0.1:28335").unwrap();
subscriber.set_subscribe(b"rawblock").unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
publisher.publish_rawblock(&block).await.unwrap();
let result = timeout(Duration::from_secs(1), async {
let topic = subscriber.recv_msg(0).unwrap();
let data = subscriber.recv_msg(0).unwrap();
(topic, data)
})
.await;
assert!(result.is_ok());
let (topic, _data) = result.unwrap();
assert_eq!(topic.as_str(), Some("rawblock"));
assert!(!_data.is_empty());
}
#[tokio::test]
async fn test_zmq_rawtx_notification() {
let config = ZmqConfig {
hashblock: None,
hashtx: None,
rawblock: None,
rawtx: Some("tcp://127.0.0.1:28336".to_string()),
sequence: None,
};
let publisher = ZmqPublisher::new(&config).unwrap();
let tx = create_test_transaction();
let ctx = Context::new();
let subscriber = ctx.socket(SUB).unwrap();
subscriber.connect("tcp://127.0.0.1:28336").unwrap();
subscriber.set_subscribe(b"rawtx").unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
publisher.publish_rawtx(&tx).await.unwrap();
let result = timeout(Duration::from_secs(1), async {
let topic = subscriber.recv_msg(0).unwrap();
let data = subscriber.recv_msg(0).unwrap();
(topic, data)
})
.await;
assert!(result.is_ok());
let (topic, _data) = result.unwrap();
assert_eq!(topic.as_str(), Some("rawtx"));
assert!(!_data.is_empty());
}
#[tokio::test]
async fn test_zmq_sequence_notification() {
let config = ZmqConfig {
hashblock: None,
hashtx: None,
rawblock: None,
rawtx: None,
sequence: Some("tcp://127.0.0.1:28337".to_string()),
};
let publisher = ZmqPublisher::new(&config).unwrap();
let tx_hash: Hash = [3u8; 32];
let ctx = Context::new();
let subscriber = ctx.socket(SUB).unwrap();
subscriber.connect("tcp://127.0.0.1:28337").unwrap();
subscriber.set_subscribe(b"sequence").unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
publisher.publish_sequence(&tx_hash, true).await.unwrap();
let result = timeout(Duration::from_secs(1), async {
let topic = subscriber.recv_msg(0).unwrap();
let data = subscriber.recv_msg(0).unwrap();
(topic, data)
})
.await;
assert!(result.is_ok());
let (topic, data) = result.unwrap();
assert_eq!(topic.as_str(), Some("sequence"));
assert_eq!(data.len(), 33); let data_bytes: &[u8] = data.as_ref();
assert_eq!(data_bytes[0], 0x01); assert_eq!(&data_bytes[1..], tx_hash.as_slice());
}
#[tokio::test]
async fn test_zmq_config_is_enabled() {
let config = ZmqConfig {
hashblock: Some("tcp://127.0.0.1:28332".to_string()),
hashtx: None,
rawblock: None,
rawtx: None,
sequence: None,
};
assert!(config.is_enabled());
let config = ZmqConfig::default();
assert!(!config.is_enabled());
}
#[tokio::test]
async fn test_zmq_publish_block_convenience() {
let config = ZmqConfig {
hashblock: Some("tcp://127.0.0.1:28338".to_string()),
hashtx: None,
rawblock: Some("tcp://127.0.0.1:28339".to_string()),
rawtx: None,
sequence: None,
};
let publisher = ZmqPublisher::new(&config).unwrap();
let block = create_test_block();
let block_hash: Hash = [4u8; 32];
let ctx = Context::new();
let hash_sub = ctx.socket(SUB).unwrap();
hash_sub.connect("tcp://127.0.0.1:28338").unwrap();
hash_sub.set_subscribe(b"hashblock").unwrap();
let raw_sub = ctx.socket(SUB).unwrap();
raw_sub.connect("tcp://127.0.0.1:28339").unwrap();
raw_sub.set_subscribe(b"rawblock").unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
publisher.publish_block(&block, &block_hash).await.unwrap();
let hash_result = timeout(Duration::from_secs(1), async {
hash_sub.recv_msg(0).unwrap();
hash_sub.recv_msg(0).unwrap();
})
.await;
let raw_result = timeout(Duration::from_secs(1), async {
raw_sub.recv_msg(0).unwrap();
raw_sub.recv_msg(0).unwrap();
})
.await;
assert!(hash_result.is_ok());
assert!(raw_result.is_ok());
}
}