#![cfg(feature = "fibre")]
use blvm_protocol::fibre::{FecChunk, FibreCapabilities, FibreProtocolError, FIBRE_MAGIC};
use blvm_protocol::{Block, Hash};
pub use blvm_protocol::fibre::FibreConfig;
use reed_solomon_erasure::{galois_8::Field, ReedSolomon};
use sha2::Digest;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::net::UdpSocket;
use tokio::sync::Mutex;
use tracing::{debug, info, warn};
pub struct FibreRelay {
encoded_blocks: HashMap<Hash, EncodedBlock>,
fibre_peers: HashMap<String, FibrePeerInfo>,
cache_ttl: Duration,
fec_encoder: Option<FecEncoder>,
udp_transport: Option<Arc<Mutex<UdpTransport>>>,
receiving_blocks: HashMap<Hash, BlockAssembly>,
sequence_counter: u64,
config: FibreConfig,
message_tx: Option<tokio::sync::mpsc::UnboundedSender<crate::network::NetworkMessage>>,
stats: Arc<Mutex<FibreStatsInternal>>,
}
#[derive(Debug, Clone)]
struct FibreStatsInternal {
blocks_sent: u64,
blocks_received: u64,
chunks_sent: u64,
chunks_received: u64,
chunks_retransmitted: u64,
fec_recoveries: u64,
udp_errors: u64,
total_latency_ms: f64,
total_successful_sends: u64,
total_send_attempts: u64,
}
#[derive(Clone)]
struct FecEncoder {
encoder: ReedSolomon<Field>,
data_shards: usize,
parity_shards: usize,
shard_size: usize,
}
impl FecEncoder {
fn new(
data_shards: usize,
parity_shards: usize,
shard_size: usize,
) -> Result<Self, FibreError> {
let encoder = ReedSolomon::new(data_shards, parity_shards)
.map_err(|e| FibreError::FecError(format!("Failed to create encoder: {e}")))?;
Ok(Self {
encoder,
data_shards,
parity_shards,
shard_size,
})
}
fn encode(&self, data: &[u8]) -> Result<Vec<Vec<u8>>, FibreError> {
let original_len = data.len();
let total_shards = self.data_shards + self.parity_shards;
let mut shards = Vec::with_capacity(total_shards);
for i in 0..self.data_shards {
let start = i * self.shard_size;
let end = (start + self.shard_size).min(original_len);
let mut shard = Vec::with_capacity(self.shard_size);
if start < original_len {
shard.extend_from_slice(&data[start..end]);
}
shard.resize(self.shard_size, 0);
shards.push(shard);
}
for _ in 0..self.parity_shards {
shards.push(vec![0u8; self.shard_size]);
}
self.encoder
.encode(&mut shards)
.map_err(|e| FibreError::FecError(format!("Encoding failed: {e}")))?;
Ok(shards)
}
fn decode(&self, shards: &[Option<Vec<u8>>]) -> Result<Vec<u8>, FibreError> {
if shards.len() != self.data_shards + self.parity_shards {
return Err(FibreError::FecError(format!(
"Invalid shard count: expected {}, got {}",
self.data_shards + self.parity_shards,
shards.len()
)));
}
let mut shard_vec: Vec<Option<Vec<u8>>> = shards.to_vec();
for shard in &mut shard_vec {
if let Some(ref mut s) = shard {
if s.len() < self.shard_size {
s.resize(self.shard_size, 0);
}
} else {
*shard = Some(vec![0u8; self.shard_size]);
}
}
self.encoder
.reconstruct(&mut shard_vec)
.map_err(|e| FibreError::FecError(format!("Decoding failed: {e}")))?;
let mut data = Vec::new();
for s in &shard_vec[..self.data_shards] {
let Some(shard) = s.as_ref() else {
return Err(FibreError::FecError(
"FEC reconstruct did not yield all data shards".to_string(),
));
};
data.extend_from_slice(shard);
}
Ok(data)
}
}
#[derive(Debug, Clone)]
pub struct EncodedBlock {
pub block_hash: Hash,
pub block: Block,
pub chunks: Vec<FecChunk>,
pub chunk_count: u32,
pub encoded_at: Instant,
pub data_chunks: u32,
}
#[derive(Clone)]
struct BlockAssembly {
block_hash: Hash,
received_chunks: HashMap<u32, FecChunk>,
total_chunks: u32,
data_chunks: u32,
received_at: Instant,
fec_encoder: FecEncoder,
}
#[derive(Debug, Clone)]
pub struct FibrePeerInfo {
pub peer_id: String,
pub udp_addr: Option<SocketAddr>,
pub capabilities: FibreCapabilities,
pub last_relay: Option<Instant>,
}
struct UdpTransport {
socket: Arc<UdpSocket>,
local_addr: SocketAddr,
connections: Arc<Mutex<HashMap<SocketAddr, UdpConnection>>>,
config: UdpTransportConfig,
}
struct UdpConnection {
peer_addr: SocketAddr,
last_seen: Instant,
out_sequence: u64,
in_sequence: u64,
pending_chunks: HashMap<u64, PendingChunk>,
}
#[derive(Clone)]
struct PendingChunk {
chunk: FecChunk,
sent_at: Instant,
retry_count: u32,
}
#[derive(Clone)]
struct UdpTransportConfig {
chunk_timeout: Duration,
max_retries: u32,
bind_addr: SocketAddr,
}
impl UdpTransport {
async fn bind(config: UdpTransportConfig) -> Result<Self, FibreError> {
let socket = UdpSocket::bind(config.bind_addr)
.await
.map_err(|e| FibreError::UdpError(format!("Failed to bind UDP socket: {e}")))?;
let local_addr = socket
.local_addr()
.map_err(|e| FibreError::UdpError(format!("Failed to get local address: {e}")))?;
Ok(Self {
socket: Arc::new(socket),
local_addr,
connections: Arc::new(Mutex::new(HashMap::new())),
config,
})
}
async fn send_chunk(&self, peer: SocketAddr, chunk: FecChunk) -> Result<(), FibreError> {
let packet = chunk
.serialize()
.map_err(|e| FibreError::UdpError(format!("Failed to serialize chunk: {e}")))?;
if packet.len() > blvm_protocol::fibre::MAX_PACKET_SIZE {
return Err(FibreError::UdpError(format!(
"Packet too large: {} bytes",
packet.len()
)));
}
self.socket
.send_to(&packet, peer)
.await
.map_err(|e| FibreError::UdpError(format!("Failed to send UDP packet: {e}")))?;
let mut connections = self.connections.lock().await;
let conn = connections.entry(peer).or_insert_with(|| UdpConnection {
peer_addr: peer,
last_seen: Instant::now(),
out_sequence: 0,
in_sequence: 0,
pending_chunks: HashMap::new(),
});
conn.last_seen = Instant::now();
conn.out_sequence = conn.out_sequence.wrapping_add(1);
if self.config.max_retries > 0 {
conn.pending_chunks.insert(
chunk.sequence,
PendingChunk {
chunk: chunk.clone(),
sent_at: Instant::now(),
retry_count: 0,
},
);
}
Ok(())
}
async fn remove_pending_chunk(&self, peer: SocketAddr, sequence: u64) {
let mut connections = self.connections.lock().await;
if let Some(conn) = connections.get_mut(&peer) {
conn.pending_chunks.remove(&sequence);
}
}
pub fn start_retry_handler(
socket: Arc<UdpSocket>,
connections: Arc<Mutex<HashMap<SocketAddr, UdpConnection>>>,
config: UdpTransportConfig,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_millis(500));
loop {
interval.tick().await;
let now = Instant::now();
let mut to_retry: Vec<(SocketAddr, PendingChunk)> = Vec::new();
let mut to_remove: Vec<(SocketAddr, u64)> = Vec::new();
let mut dead_connections: Vec<SocketAddr> = Vec::new();
{
let mut conns = connections.lock().await;
for (peer_addr, conn) in conns.iter_mut() {
if now.duration_since(conn.last_seen) > Duration::from_secs(30) {
dead_connections.push(*peer_addr);
continue;
}
for (seq, pending) in conn.pending_chunks.iter_mut() {
let elapsed = now.duration_since(pending.sent_at);
if elapsed >= config.chunk_timeout {
if pending.retry_count < config.max_retries {
pending.retry_count += 1;
pending.sent_at = now;
to_retry.push((*peer_addr, pending.clone()));
} else {
to_remove.push((*peer_addr, *seq));
}
}
}
}
for peer in &dead_connections {
conns.remove(peer);
debug!("Removed dead FIBRE connection: {}", peer);
}
}
for (peer_addr, pending) in to_retry {
let packet: Vec<u8> = match pending.chunk.serialize() {
Ok(p) => p,
Err(e) => {
warn!(
"Failed to serialize chunk for retry to {}: {}",
peer_addr, e
);
continue;
}
};
if let Err(e) = socket.send_to(&packet, peer_addr).await {
warn!("Failed to retry chunk to {}: {}", peer_addr, e);
} else {
debug!(
"Retried chunk {} to {} (attempt {})",
pending.chunk.sequence, peer_addr, pending.retry_count
);
}
}
{
let mut conns = connections.lock().await;
for (peer_addr, seq) in to_remove {
if let Some(conn) = conns.get_mut(&peer_addr) {
conn.pending_chunks.remove(&seq);
debug!("Removed chunk {} from {} after max retries", seq, peer_addr);
}
}
}
}
})
}
async fn recv_chunk(&self) -> Result<(SocketAddr, FecChunk), FibreError> {
let mut buffer = vec![0u8; blvm_protocol::fibre::MAX_PACKET_SIZE];
let (len, peer_addr) = self
.socket
.recv_from(&mut buffer)
.await
.map_err(|e| FibreError::UdpError(format!("Failed to receive UDP packet: {e}")))?;
buffer.truncate(len);
let chunk = FecChunk::deserialize(&buffer)
.map_err(|e| FibreError::UdpError(format!("Failed to deserialize chunk: {e}")))?;
Ok((peer_addr, chunk))
}
pub fn start_receiver(
socket: Arc<UdpSocket>,
chunk_tx: tokio::sync::mpsc::UnboundedSender<(SocketAddr, FecChunk)>,
connections: Arc<Mutex<HashMap<SocketAddr, UdpConnection>>>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
loop {
match Self::recv_chunk_internal(&socket).await {
Ok((peer_addr, chunk)) => {
{
let mut conns = connections.lock().await;
let conn = conns.entry(peer_addr).or_insert_with(|| UdpConnection {
peer_addr,
last_seen: Instant::now(),
out_sequence: 0,
in_sequence: 0,
pending_chunks: HashMap::new(),
});
conn.last_seen = Instant::now();
if chunk.sequence > conn.in_sequence {
conn.in_sequence = chunk.sequence;
} else {
debug!("Dropping duplicate FIBRE chunk from {}", peer_addr);
continue;
}
}
if chunk_tx.send((peer_addr, chunk)).is_err() {
debug!("FIBRE chunk channel closed, stopping receiver");
break;
}
}
Err(e) => {
warn!("FIBRE UDP receive error: {}", e);
}
}
}
})
}
async fn recv_chunk_internal(
socket: &Arc<UdpSocket>,
) -> Result<(SocketAddr, FecChunk), FibreError> {
let mut buffer = vec![0u8; blvm_protocol::fibre::MAX_PACKET_SIZE];
let (len, peer_addr) = socket
.recv_from(&mut buffer)
.await
.map_err(|e| FibreError::UdpError(format!("Failed to receive UDP packet: {e}")))?;
buffer.truncate(len);
let chunk = FecChunk::deserialize(&buffer)
.map_err(|e| FibreError::UdpError(format!("Failed to deserialize chunk: {e}")))?;
Ok((peer_addr, chunk))
}
}
impl Default for FibreRelay {
fn default() -> Self {
Self::new()
}
}
impl FibreRelay {
pub fn new() -> Self {
Self {
encoded_blocks: HashMap::new(),
fibre_peers: HashMap::new(),
cache_ttl: Duration::from_secs(300), fec_encoder: None,
udp_transport: None,
receiving_blocks: HashMap::new(),
sequence_counter: 0,
config: FibreConfig::default(),
message_tx: None,
stats: Arc::new(Mutex::new(FibreStatsInternal {
blocks_sent: 0,
blocks_received: 0,
chunks_sent: 0,
chunks_received: 0,
chunks_retransmitted: 0,
fec_recoveries: 0,
udp_errors: 0,
total_latency_ms: 0.0,
total_successful_sends: 0,
total_send_attempts: 0,
})),
}
}
pub fn with_config(config: FibreConfig) -> Self {
Self {
encoded_blocks: HashMap::new(),
fibre_peers: HashMap::new(),
cache_ttl: Duration::from_secs(300),
fec_encoder: None,
udp_transport: None,
receiving_blocks: HashMap::new(),
sequence_counter: 0,
config,
message_tx: None,
stats: Arc::new(Mutex::new(FibreStatsInternal {
blocks_sent: 0,
blocks_received: 0,
chunks_sent: 0,
chunks_received: 0,
chunks_retransmitted: 0,
fec_recoveries: 0,
udp_errors: 0,
total_latency_ms: 0.0,
total_successful_sends: 0,
total_send_attempts: 0,
})),
}
}
pub fn set_message_sender(
&mut self,
tx: tokio::sync::mpsc::UnboundedSender<crate::network::NetworkMessage>,
) {
self.message_tx = Some(tx);
}
pub async fn initialize_udp(
&mut self,
bind_addr: SocketAddr,
) -> Result<tokio::sync::mpsc::UnboundedReceiver<(SocketAddr, FecChunk)>, FibreError> {
let config = UdpTransportConfig {
chunk_timeout: Duration::from_secs(self.config.chunk_timeout_secs),
max_retries: self.config.max_retries,
bind_addr,
};
let transport = UdpTransport::bind(config.clone()).await?;
let local_addr = transport.local_addr;
let socket = transport.socket.clone();
let connections = transport.connections.clone();
let transport_arc = Arc::new(Mutex::new(transport));
self.udp_transport = Some(transport_arc);
let (chunk_tx, chunk_rx) = tokio::sync::mpsc::unbounded_channel();
UdpTransport::start_receiver(socket.clone(), chunk_tx, connections.clone());
UdpTransport::start_retry_handler(socket, connections, config);
info!(
"FIBRE UDP transport initialized on {} with retry handler",
local_addr
);
Ok(chunk_rx)
}
pub fn register_fibre_peer(&mut self, peer_id: String, udp_addr: Option<SocketAddr>) {
let peer_info = FibrePeerInfo {
peer_id: peer_id.clone(),
udp_addr,
capabilities: FibreCapabilities::default(),
last_relay: None,
};
self.fibre_peers.insert(peer_id, peer_info);
debug!("Registered FIBRE peer");
}
pub fn encode_block(&mut self, block: Block) -> Result<EncodedBlock, FibreError> {
let mut header_bytes = [0u8; 80];
header_bytes[0..4].copy_from_slice(&(block.header.version as i32).to_le_bytes());
header_bytes[4..36].copy_from_slice(&block.header.prev_block_hash);
header_bytes[36..68].copy_from_slice(&block.header.merkle_root);
header_bytes[68..72].copy_from_slice(&(block.header.timestamp as u32).to_le_bytes());
header_bytes[72..76].copy_from_slice(&(block.header.bits as u32).to_le_bytes());
header_bytes[76..80].copy_from_slice(&(block.header.nonce as u32).to_le_bytes());
let first_hash = sha2::Sha256::digest(header_bytes);
let second_hash = sha2::Sha256::digest(first_hash);
let mut block_hash = [0u8; 32];
block_hash.copy_from_slice(&second_hash);
if let Some(encoded) = self.encoded_blocks.get(&block_hash) {
if encoded.encoded_at.elapsed() < self.cache_ttl {
return Ok(encoded.clone());
}
}
let block_data = bincode::serialize(&block)
.map_err(|e| FibreError::SerializationError(e.to_string()))?;
let shard_size = blvm_protocol::fibre::DEFAULT_SHARD_SIZE;
let data_shards = block_data.len().div_ceil(shard_size);
let parity_shards = ((data_shards as f64) * self.config.fec_parity_ratio).ceil() as usize;
let total_shards = data_shards + parity_shards;
if self.fec_encoder.is_none() {
self.fec_encoder = Some(FecEncoder::new(data_shards, parity_shards, shard_size)?);
}
let encoder = self.fec_encoder.as_ref().unwrap();
let shards = encoder.encode(&block_data)?;
let sequence = self.sequence_counter;
self.sequence_counter = self.sequence_counter.wrapping_add(1);
let chunks: Vec<FecChunk> = shards
.into_iter()
.enumerate()
.map(|(i, shard_data)| {
let size = shard_data.len();
FecChunk {
index: i as u32,
total_chunks: total_shards as u32,
data_chunks: data_shards as u32,
data: shard_data,
size,
block_hash,
sequence,
magic: FIBRE_MAGIC,
}
})
.collect();
let chunk_count = chunks.len() as u32;
let encoded = EncodedBlock {
block_hash,
block,
chunks,
chunk_count,
encoded_at: Instant::now(),
data_chunks: data_shards as u32,
};
let encoded_for_cache = encoded.clone();
self.encoded_blocks.insert(block_hash, encoded_for_cache);
info!(
"Encoded block {} for FIBRE transmission ({} data + {} parity = {} total chunks)",
hex::encode(block_hash),
data_shards,
parity_shards,
encoded.chunk_count
);
Ok(encoded)
}
pub async fn send_block(
&mut self,
peer_id: &str,
encoded: EncodedBlock,
) -> Result<(), FibreError> {
let start_time = Instant::now();
let chunk_count = encoded.chunks.len() as u64;
let peer = self
.fibre_peers
.get(peer_id)
.ok_or_else(|| FibreError::UdpError(format!("Peer not found: {peer_id}")))?;
let udp_addr = peer
.udp_addr
.ok_or_else(|| FibreError::UdpError("Peer has no UDP address".to_string()))?;
let transport = self
.udp_transport
.as_ref()
.ok_or_else(|| FibreError::UdpError("UDP transport not initialized".to_string()))?;
let packets: Result<Vec<_>, _> = encoded
.chunks
.iter()
.map(|chunk| {
chunk
.serialize()
.map_err(|e| FibreError::UdpError(format!("Serialization failed: {e}")))
})
.collect();
let packets = packets?;
let send_result = {
let transport_guard = transport.lock().await;
let socket = transport_guard.socket.clone();
let connections = transport_guard.connections.clone();
drop(transport_guard);
let mut send_errors = 0u64;
for packet in &packets {
if packet.len() > blvm_protocol::fibre::MAX_PACKET_SIZE {
send_errors += 1;
continue;
}
if let Err(e) = socket.send_to(packet, udp_addr).await {
send_errors += 1;
debug!("Failed to send UDP packet: {e}");
}
}
let mut conns = connections.lock().await;
let conn = conns.entry(udp_addr).or_insert_with(|| UdpConnection {
peer_addr: udp_addr,
last_seen: Instant::now(),
out_sequence: 0,
in_sequence: 0,
pending_chunks: HashMap::new(),
});
conn.last_seen = Instant::now();
conn.out_sequence = conn.out_sequence.wrapping_add(packets.len() as u64);
if send_errors > 0 {
Err(FibreError::UdpError(format!(
"Failed to send {} packets",
send_errors
)))
} else {
Ok(())
}
};
let latency_ms = start_time.elapsed().as_millis() as f64;
match send_result {
Ok(()) => {
let mut stats = self.stats.lock().await;
stats.total_send_attempts += 1;
stats.chunks_sent += chunk_count;
stats.blocks_sent += 1;
stats.total_successful_sends += 1;
stats.total_latency_ms += latency_ms;
drop(stats);
self.mark_relay_success(peer_id);
Ok(())
}
Err(e) => {
let mut stats = self.stats.lock().await;
stats.total_send_attempts += 1;
stats.chunks_sent += chunk_count;
stats.udp_errors += 1;
Err(e)
}
}
}
pub async fn process_received_chunk(
&mut self,
chunk: FecChunk,
) -> Result<Option<Block>, FibreError> {
{
let mut stats = self.stats.lock().await;
stats.chunks_received += 1;
}
if self.receiving_blocks.len() >= self.config.max_assemblies {
let oldest = self
.receiving_blocks
.iter()
.min_by_key(|(_, assembly)| assembly.received_at)
.map(|(hash, _)| *hash);
if let Some(hash) = oldest {
self.receiving_blocks.remove(&hash);
warn!(
"Removed oldest block assembly due to limit: {}",
hex::encode(hash)
);
}
}
let block_hash = chunk.block_hash;
let data_shards = chunk.data_chunks as usize;
let parity_shards = chunk.total_chunks.saturating_sub(chunk.data_chunks) as usize;
const MAX_TOTAL_SHARDS: usize = 1024;
if data_shards == 0
|| chunk.total_chunks < chunk.data_chunks
|| chunk.total_chunks as usize > MAX_TOTAL_SHARDS
{
return Err(FibreError::FecError(format!(
"Invalid FEC chunk params: data_chunks={}, total_chunks={}",
chunk.data_chunks, chunk.total_chunks
)));
}
let shard_size = blvm_protocol::fibre::DEFAULT_SHARD_SIZE;
let should_reconstruct = {
if !self.receiving_blocks.contains_key(&block_hash) {
let fec_encoder = FecEncoder::new(data_shards, parity_shards, shard_size)?;
self.receiving_blocks.insert(
block_hash,
BlockAssembly {
block_hash: chunk.block_hash,
received_chunks: HashMap::new(),
total_chunks: chunk.total_chunks,
data_chunks: chunk.data_chunks,
received_at: Instant::now(),
fec_encoder,
},
);
}
let assembly = match self.receiving_blocks.get_mut(&block_hash) {
Some(a) => a,
None => {
return Err(FibreError::FecError(
"FIBRE block assembly missing after insert".to_string(),
));
}
};
assembly.received_chunks.insert(chunk.index, chunk.clone());
let received_count = assembly.received_chunks.len();
let required_count = assembly.data_chunks as usize;
received_count >= required_count
};
if should_reconstruct {
let (total_chunks, fec_encoder, received_chunks, block_hash_check) = {
let Some(assembly) = self.receiving_blocks.get(&block_hash) else {
return Err(FibreError::FecError(
"FIBRE block assembly missing before reconstruction".to_string(),
));
};
(
assembly.total_chunks,
assembly.fec_encoder.clone(),
assembly.received_chunks.clone(),
assembly.block_hash,
)
};
self.receiving_blocks.remove(&block_hash);
let mut shards: Vec<Option<Vec<u8>>> = vec![None; total_chunks as usize];
let received_count = received_chunks.len();
let required_count = total_chunks as usize;
for (idx, chunk) in &received_chunks {
shards[*idx as usize] = Some(chunk.data.clone());
}
if received_count < required_count {
let mut stats = self.stats.lock().await;
stats.fec_recoveries += 1;
}
let block_data = fec_encoder.decode(&shards)?;
let block: Block = bincode::deserialize(&block_data).map_err(|e| {
FibreError::SerializationError(format!("Failed to deserialize block: {e}"))
})?;
let mut header_bytes = Vec::with_capacity(80);
header_bytes.extend_from_slice(&(block.header.version as i32).to_le_bytes());
header_bytes.extend_from_slice(&block.header.prev_block_hash);
header_bytes.extend_from_slice(&block.header.merkle_root);
header_bytes.extend_from_slice(&(block.header.timestamp as u32).to_le_bytes());
header_bytes.extend_from_slice(&(block.header.bits as u32).to_le_bytes());
header_bytes.extend_from_slice(&(block.header.nonce as u32).to_le_bytes());
let first_hash = sha2::Sha256::digest(header_bytes);
let second_hash = sha2::Sha256::digest(first_hash);
let mut computed_hash = [0u8; 32];
computed_hash.copy_from_slice(&second_hash);
if computed_hash != block_hash_check {
return Err(FibreError::UdpError(
"Block hash mismatch after reconstruction".to_string(),
));
}
info!(
"FIBRE block {} assembled successfully",
hex::encode(block_hash_check)
);
return Ok(Some(block));
}
Ok(None)
}
pub fn get_encoded_block(&self, block_hash: &Hash) -> Option<&EncodedBlock> {
self.encoded_blocks
.get(block_hash)
.filter(|e| e.encoded_at.elapsed() < self.cache_ttl)
}
pub async fn udp_local_addr(&self) -> Option<SocketAddr> {
match &self.udp_transport {
Some(t) => Some(t.lock().await.local_addr),
None => None,
}
}
pub fn get_fibre_peers(&self) -> Vec<&FibrePeerInfo> {
self.fibre_peers.values().collect()
}
pub fn is_fibre_peer(&self, peer_id: &str) -> bool {
self.fibre_peers.contains_key(peer_id)
}
pub fn mark_relay_success(&mut self, peer_id: &str) {
if let Some(peer) = self.fibre_peers.get_mut(peer_id) {
peer.last_relay = Some(Instant::now());
}
}
pub fn cleanup_expired(&mut self) {
let now = Instant::now();
let expired: Vec<Hash> = self
.encoded_blocks
.iter()
.filter(|(_, encoded)| encoded.encoded_at.elapsed() >= self.cache_ttl)
.map(|(hash, _)| *hash)
.collect();
for hash in expired {
self.encoded_blocks.remove(&hash);
debug!(
"Cleaned up expired FIBRE encoded block {}",
hex::encode(hash)
);
}
let stale: Vec<Hash> = self
.receiving_blocks
.iter()
.filter(|(_, assembly)| {
now.duration_since(assembly.received_at) > Duration::from_secs(30)
})
.map(|(hash, _)| *hash)
.collect();
for hash in stale {
self.receiving_blocks.remove(&hash);
debug!(
"Cleaned up stale FIBRE block assembly {}",
hex::encode(hash)
);
}
}
pub async fn get_stats(&self) -> FibreStats {
let stats = self.stats.lock().await;
let total_attempts = stats.total_send_attempts.max(1); let success_rate = if total_attempts > 0 {
stats.total_successful_sends as f64 / total_attempts as f64
} else {
1.0
};
let average_latency = if stats.total_successful_sends > 0 {
stats.total_latency_ms / stats.total_successful_sends as f64
} else {
0.0
};
FibreStats {
encoded_blocks: self.encoded_blocks.len(),
fibre_peers: self.fibre_peers.len(),
cache_ttl_secs: self.cache_ttl.as_secs(),
blocks_sent: stats.blocks_sent,
blocks_received: stats.blocks_received,
chunks_sent: stats.chunks_sent,
chunks_received: stats.chunks_received,
chunks_retransmitted: stats.chunks_retransmitted,
fec_recoveries: stats.fec_recoveries,
udp_errors: stats.udp_errors,
average_latency_ms: average_latency,
success_rate,
}
}
}
pub fn start_chunk_processor(
relay: Arc<Mutex<FibreRelay>>,
mut chunk_rx: tokio::sync::mpsc::UnboundedReceiver<(SocketAddr, FecChunk)>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
while let Some((peer_addr, chunk)) = chunk_rx.recv().await {
let result = {
let mut relay_guard = relay.lock().await;
relay_guard.process_received_chunk(chunk).await
};
match result {
Ok(Some(block)) => {
let message_tx = {
let relay_guard = relay.lock().await;
relay_guard.message_tx.clone()
};
if let Some(tx) = message_tx {
match bincode::serialize(&block) {
Ok(block_data) => {
if tx
.send(crate::network::NetworkMessage::BlockReceived(block_data))
.is_err()
{
warn!("FIBRE: Failed to send assembled block to NetworkManager (channel closed)");
} else {
info!(
"FIBRE block assembled from {} and sent to NetworkManager",
peer_addr
);
}
}
Err(e) => {
warn!(
"FIBRE: Failed to serialize assembled block from {}: {}",
peer_addr, e
);
}
}
} else {
debug!(
"FIBRE block assembled from {} but no message_tx channel set",
peer_addr
);
}
}
Ok(None) => {
debug!("FIBRE block assembly in progress from {}", peer_addr);
}
Err(e) => {
warn!("FIBRE chunk processing error from {}: {}", peer_addr, e);
}
}
}
debug!("FIBRE chunk processor stopped");
})
}
#[derive(Debug, Clone)]
pub struct FibreStats {
pub encoded_blocks: usize,
pub fibre_peers: usize,
pub cache_ttl_secs: u64,
pub blocks_sent: u64,
pub blocks_received: u64,
pub chunks_sent: u64,
pub chunks_received: u64,
pub chunks_retransmitted: u64,
pub fec_recoveries: u64,
pub udp_errors: u64,
pub average_latency_ms: f64,
pub success_rate: f64,
}
#[derive(Debug, thiserror::Error)]
pub enum FibreError {
#[error("Serialization error: {0}")]
SerializationError(String),
#[error("FEC encoding error: {0}")]
FecError(String),
#[error("UDP transmission error: {0}")]
UdpError(String),
#[error("Block not found in cache")]
BlockNotFound,
}
impl From<FibreProtocolError> for FibreError {
fn from(err: FibreProtocolError) -> Self {
match err {
FibreProtocolError::InvalidPacket(msg) => FibreError::UdpError(msg),
FibreProtocolError::SerializationError(msg) => FibreError::SerializationError(msg),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use blvm_protocol::BlockHeader;
fn create_test_block() -> Block {
Block {
header: BlockHeader {
version: 1,
prev_block_hash: [0x11; 32],
merkle_root: [0x22; 32],
timestamp: 1234567890,
bits: 0x1d00ffff,
nonce: 0x12345678,
},
transactions: vec![].into(),
}
}
#[test]
fn test_fibre_relay_new() {
let relay = FibreRelay::new();
assert_eq!(relay.encoded_blocks.len(), 0);
assert_eq!(relay.fibre_peers.len(), 0);
assert!(relay.udp_transport.is_none());
}
#[test]
fn test_fibre_relay_encode_block() {
let mut relay = FibreRelay::new();
let block = create_test_block();
let encoded = relay.encode_block(block).unwrap();
assert!(encoded.chunk_count > 0);
assert_eq!(encoded.chunks.len(), encoded.chunk_count as usize);
let mut header_bytes = Vec::with_capacity(80);
header_bytes.extend_from_slice(&(encoded.block.header.version as i32).to_le_bytes());
header_bytes.extend_from_slice(&encoded.block.header.prev_block_hash);
header_bytes.extend_from_slice(&encoded.block.header.merkle_root);
header_bytes.extend_from_slice(&(encoded.block.header.timestamp as u32).to_le_bytes());
header_bytes.extend_from_slice(&(encoded.block.header.bits as u32).to_le_bytes());
header_bytes.extend_from_slice(&(encoded.block.header.nonce as u32).to_le_bytes());
let first_hash = sha2::Sha256::digest(header_bytes);
let second_hash = sha2::Sha256::digest(first_hash);
let mut computed_hash = [0u8; 32];
computed_hash.copy_from_slice(&second_hash);
assert_eq!(encoded.block_hash, computed_hash);
}
#[test]
fn test_fibre_relay_encode_block_caching() {
let mut relay = FibreRelay::new();
let block = create_test_block();
let encoded1 = relay.encode_block(block.clone()).unwrap();
let encoded2 = relay.encode_block(block).unwrap();
assert_eq!(encoded1.block_hash, encoded2.block_hash);
assert_eq!(encoded1.chunk_count, encoded2.chunk_count);
}
#[test]
fn test_fibre_relay_register_peer() {
let mut relay = FibreRelay::new();
let udp_addr: SocketAddr = "127.0.0.1:8334".parse().unwrap();
relay.register_fibre_peer("peer1".to_string(), Some(udp_addr));
assert!(relay.is_fibre_peer("peer1"));
assert!(!relay.is_fibre_peer("peer2"));
let peers = relay.get_fibre_peers();
assert_eq!(peers.len(), 1);
assert_eq!(peers[0].peer_id, "peer1");
assert_eq!(peers[0].udp_addr, Some(udp_addr));
}
#[test]
fn test_fec_encoder_encode_decode() {
let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let data_shards = 2;
let parity_shards = 1;
let shard_size = 5;
let encoder = FecEncoder::new(data_shards, parity_shards, shard_size).unwrap();
let shards = encoder.encode(&data).unwrap();
assert_eq!(shards.len(), data_shards + parity_shards);
let shard_options: Vec<Option<Vec<u8>>> = shards.iter().map(|s| Some(s.clone())).collect();
let decoded = encoder.decode(&shard_options).unwrap();
assert_eq!(decoded[..data.len()], data);
}
#[test]
#[ignore] fn test_fec_encoder_decode_with_missing_shards() {
let data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; let data_shards = 2;
let parity_shards = 1;
let shard_size = 5;
let encoder = FecEncoder::new(data_shards, parity_shards, shard_size).unwrap();
let shards = encoder.encode(&data).unwrap();
assert_eq!(shards.len(), data_shards + parity_shards);
let all_shards: Vec<Option<Vec<u8>>> = shards.iter().map(|s| Some(s.clone())).collect();
let decoded_all = encoder.decode(&all_shards).unwrap();
assert_eq!(decoded_all[..data.len()], data);
}
#[test]
fn test_fibre_relay_cleanup_expired() {
let mut relay = FibreRelay::new();
let block = create_test_block();
let encoded = relay.encode_block(block).unwrap();
relay.cleanup_expired();
assert!(relay.get_encoded_block(&encoded.block_hash).is_some());
}
}