use crate::network::transport::TransportType;
use anyhow::Result;
use blvm_protocol::network::NetworkMessage as ConsensusNetworkMessage;
#[cfg(feature = "production")]
use std::collections::hash_map::DefaultHasher;
#[cfg(feature = "production")]
use std::hash::{Hash, Hasher};
#[cfg(feature = "production")]
use std::sync::{OnceLock, RwLock};
#[cfg(feature = "production")]
static SERIALIZATION_CACHE: OnceLock<RwLock<blvm_protocol::lru::LruCache<u64, Vec<u8>>>> =
OnceLock::new();
#[cfg(feature = "production")]
fn get_serialization_cache() -> &'static RwLock<blvm_protocol::lru::LruCache<u64, Vec<u8>>> {
SERIALIZATION_CACHE.get_or_init(|| {
use blvm_protocol::lru::LruCache;
use std::num::NonZeroUsize;
RwLock::new(LruCache::new(NonZeroUsize::new(5_000).unwrap()))
})
}
#[cfg(feature = "production")]
fn calculate_message_cache_key(msg: &ConsensusNetworkMessage, transport: TransportType) -> u64 {
let mut hasher = DefaultHasher::new();
std::mem::discriminant(msg).hash(&mut hasher);
std::mem::discriminant(&transport).hash(&mut hasher);
hasher.finish()
}
pub struct ProtocolAdapter;
impl ProtocolAdapter {
pub fn serialize_message(
msg: &ConsensusNetworkMessage,
transport: TransportType,
) -> Result<Vec<u8>> {
#[cfg(feature = "production")]
{
let cache = get_serialization_cache();
let cache_key = calculate_message_cache_key(msg, transport);
if let Ok(cached) = cache.read() {
if let Some(serialized) = cached.peek(&cache_key) {
return Ok(serialized.clone()); }
}
let serialized = Self::serialize_message_inner(msg, transport)?;
if let Ok(mut cache) = cache.write() {
cache.put(cache_key, serialized.clone());
}
Ok(serialized)
}
#[cfg(not(feature = "production"))]
{
Self::serialize_message_inner(msg, transport)
}
}
fn serialize_message_inner(
msg: &ConsensusNetworkMessage,
transport: TransportType,
) -> Result<Vec<u8>> {
match transport {
TransportType::Tcp => Self::serialize_bitcoin_wire_format(msg),
#[cfg(feature = "quinn")]
TransportType::Quinn => {
Self::serialize_iroh_format(msg)
}
#[cfg(feature = "iroh")]
TransportType::Iroh => Self::serialize_iroh_format(msg),
}
}
pub fn deserialize_message(
data: &[u8],
transport: TransportType,
) -> Result<ConsensusNetworkMessage> {
match transport {
TransportType::Tcp => Self::deserialize_bitcoin_wire_format(data),
#[cfg(feature = "quinn")]
TransportType::Quinn => {
Self::deserialize_iroh_format(data)
}
#[cfg(feature = "iroh")]
TransportType::Iroh => Self::deserialize_iroh_format(data),
}
}
fn serialize_bitcoin_wire_format(msg: &ConsensusNetworkMessage) -> Result<Vec<u8>> {
let protocol_msg = Self::consensus_to_protocol_message(msg)?;
let payload = match &protocol_msg {
crate::network::protocol::ProtocolMessage::Version(v) => {
use blvm_protocol::network::{NetworkAddress, VersionMessage};
use blvm_protocol::wire::serialize_version;
let version_msg = VersionMessage {
version: v.version as u32,
services: v.services,
timestamp: v.timestamp,
addr_recv: NetworkAddress {
services: v.addr_recv.services,
ip: v.addr_recv.ip,
port: v.addr_recv.port,
},
addr_from: NetworkAddress {
services: v.addr_from.services,
ip: v.addr_from.ip,
port: v.addr_from.port,
},
nonce: v.nonce,
user_agent: v.user_agent.clone(),
start_height: v.start_height,
relay: v.relay,
};
serialize_version(&version_msg)?
}
crate::network::protocol::ProtocolMessage::Verack => {
vec![]
}
crate::network::protocol::ProtocolMessage::Ping(p) => bincode::serialize(p)?,
crate::network::protocol::ProtocolMessage::Pong(p) => bincode::serialize(p)?,
crate::network::protocol::ProtocolMessage::AddrV2(a) => {
blvm_protocol::wire::serialize_addrv2(a).map_err(|e| anyhow::anyhow!("{e}"))?
}
_ => {
return Err(anyhow::anyhow!(
"Unsupported message type for serialization"
));
}
};
let command = Self::message_to_command(msg);
let mut command_bytes = [0u8; 12];
command_bytes[..command.len().min(12)].copy_from_slice(command.as_bytes());
#[cfg(feature = "production")]
let checksum_bytes = {
use blvm_consensus::crypto::OptimizedSha256;
let hasher = OptimizedSha256::new();
hasher.hash256(&payload)
};
#[cfg(feature = "production")]
let checksum = &checksum_bytes[..4];
#[cfg(not(feature = "production"))]
let checksum_bytes = {
use sha2::{Digest, Sha256};
let hash1 = Sha256::digest(&payload);
let hash2 = Sha256::digest(hash1);
&hash2[..4]
};
let mut message = Vec::new();
message.extend_from_slice(&0xd9b4bef9u32.to_le_bytes());
message.extend_from_slice(&command_bytes);
message.extend_from_slice(&(payload.len() as u32).to_le_bytes());
message.extend_from_slice(checksum);
message.extend_from_slice(&payload);
Ok(message)
}
fn deserialize_bitcoin_wire_format(data: &[u8]) -> Result<ConsensusNetworkMessage> {
use crate::network::protocol::ProtocolParser;
let protocol_msg = ProtocolParser::parse_message(data)?;
Self::protocol_to_consensus_message(&protocol_msg)
}
#[cfg(any(feature = "iroh", feature = "quinn"))]
fn serialize_iroh_format(msg: &ConsensusNetworkMessage) -> Result<Vec<u8>> {
let protocol_msg = Self::consensus_to_protocol_message(msg)?;
bincode::serialize(&protocol_msg)
.map_err(|e| anyhow::anyhow!("Failed to serialize message: {}", e))
}
#[cfg(any(feature = "iroh", feature = "quinn"))]
fn deserialize_iroh_format(data: &[u8]) -> Result<ConsensusNetworkMessage> {
let protocol_msg: crate::network::protocol::ProtocolMessage = bincode::deserialize(data)
.map_err(|e| anyhow::anyhow!("Failed to deserialize message: {}", e))?;
Self::protocol_to_consensus_message(&protocol_msg)
}
fn consensus_to_protocol_message(
msg: &ConsensusNetworkMessage,
) -> Result<crate::network::protocol::ProtocolMessage> {
use crate::network::protocol::{
NetworkAddress as ProtoNetworkAddress, PingMessage as ProtoPingMessage,
PongMessage as ProtoPongMessage, ProtocolMessage,
VersionMessage as ProtoVersionMessage,
};
match msg {
ConsensusNetworkMessage::Version(v) => {
Ok(ProtocolMessage::Version(ProtoVersionMessage {
version: v.version as i32,
services: v.services,
timestamp: v.timestamp,
addr_recv: ProtoNetworkAddress {
services: v.addr_recv.services,
ip: v.addr_recv.ip,
port: v.addr_recv.port,
},
addr_from: ProtoNetworkAddress {
services: v.addr_from.services,
ip: v.addr_from.ip,
port: v.addr_from.port,
},
nonce: v.nonce,
user_agent: v.user_agent.clone(),
start_height: v.start_height,
relay: v.relay,
}))
}
ConsensusNetworkMessage::VerAck => Ok(ProtocolMessage::Verack),
ConsensusNetworkMessage::Ping(p) => {
Ok(ProtocolMessage::Ping(ProtoPingMessage { nonce: p.nonce }))
}
ConsensusNetworkMessage::Pong(p) => {
Ok(ProtocolMessage::Pong(ProtoPongMessage { nonce: p.nonce }))
}
ConsensusNetworkMessage::AddrV2(a) => Ok(ProtocolMessage::AddrV2(a.clone())),
_ => Err(anyhow::anyhow!(
"Unsupported message type for protocol conversion"
)),
}
}
pub fn protocol_to_consensus_message(
msg: &crate::network::protocol::ProtocolMessage,
) -> Result<ConsensusNetworkMessage> {
use crate::network::protocol::ProtocolMessage;
use blvm_protocol::network::{
NetworkAddress as ConsensusNetworkAddress, PingMessage as ConsensusPingMessage,
PongMessage as ConsensusPongMessage, VersionMessage as ConsensusVersionMessage,
};
match msg {
ProtocolMessage::Version(v) => {
Ok(ConsensusNetworkMessage::Version(ConsensusVersionMessage {
version: v.version as u32,
services: v.services,
timestamp: v.timestamp,
addr_recv: ConsensusNetworkAddress {
services: v.addr_recv.services,
ip: v.addr_recv.ip,
port: v.addr_recv.port,
},
addr_from: ConsensusNetworkAddress {
services: v.addr_from.services,
ip: v.addr_from.ip,
port: v.addr_from.port,
},
nonce: v.nonce,
user_agent: v.user_agent.clone(),
start_height: v.start_height,
relay: v.relay,
}))
}
ProtocolMessage::Verack => Ok(ConsensusNetworkMessage::VerAck),
ProtocolMessage::Ping(p) => Ok(ConsensusNetworkMessage::Ping(ConsensusPingMessage {
nonce: p.nonce,
})),
ProtocolMessage::Pong(p) => Ok(ConsensusNetworkMessage::Pong(ConsensusPongMessage {
nonce: p.nonce,
})),
ProtocolMessage::AddrV2(a) => Ok(ConsensusNetworkMessage::AddrV2(a.clone())),
_ => Err(anyhow::anyhow!(
"Unsupported message type for consensus conversion"
)),
}
}
fn message_to_command(msg: &ConsensusNetworkMessage) -> &'static str {
match msg {
ConsensusNetworkMessage::Version(_) => "version",
ConsensusNetworkMessage::VerAck => "verack",
ConsensusNetworkMessage::Addr(_) => "addr",
ConsensusNetworkMessage::AddrV2(_) => "addrv2",
ConsensusNetworkMessage::Inv(_) => "inv",
ConsensusNetworkMessage::GetData(_) => "getdata",
ConsensusNetworkMessage::GetHeaders(_) => "getheaders",
ConsensusNetworkMessage::Headers(_) => "headers",
ConsensusNetworkMessage::Block(_) => "block",
ConsensusNetworkMessage::Tx(_) => "tx",
ConsensusNetworkMessage::Ping(_) => "ping",
ConsensusNetworkMessage::Pong(_) => "pong",
ConsensusNetworkMessage::MemPool => "mempool",
ConsensusNetworkMessage::FeeFilter(_) => "feefilter",
ConsensusNetworkMessage::GetBlocks(_) => "getblocks",
ConsensusNetworkMessage::GetAddr => "getaddr",
ConsensusNetworkMessage::NotFound(_) => "notfound",
ConsensusNetworkMessage::Reject(_) => "reject",
ConsensusNetworkMessage::SendHeaders => "sendheaders",
ConsensusNetworkMessage::SendCmpct(_) => "sendcmpct",
ConsensusNetworkMessage::CmpctBlock(_) => "cmpctblock",
ConsensusNetworkMessage::GetBlockTxn(_) => "getblocktxn",
ConsensusNetworkMessage::BlockTxn(_) => "blocktxn",
#[cfg(feature = "utxo-commitments")]
ConsensusNetworkMessage::GetUTXOSet(_) => "getutxoset",
#[cfg(feature = "utxo-commitments")]
ConsensusNetworkMessage::UTXOSet(_) => "utxoset",
#[cfg(feature = "utxo-commitments")]
ConsensusNetworkMessage::GetFilteredBlock(_) => "getfilteredblock",
#[cfg(feature = "utxo-commitments")]
ConsensusNetworkMessage::FilteredBlock(_) => "filteredblock",
ConsensusNetworkMessage::GetBanList(_) => "getbanlist",
ConsensusNetworkMessage::BanList(_) => "banlist",
}
}
}