use std::sync::Arc;
use std::time::Duration;
use freenet_stdlib::prelude::{ContractKey, WrappedState};
use crate::node::OpManager;
use crate::ring::PeerKeyLocation;
use super::p2p_protoc::P2pBridge;
const STREAM_COMPLETION_TIMEOUT: Duration = Duration::from_secs(120);
#[cfg(not(feature = "simulation_tests"))]
mod queue {
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use freenet_stdlib::prelude::{ContractKey, WrappedState};
use tokio::sync::{Mutex, Notify, Semaphore};
use crate::node::OpManager;
use crate::ring::PeerKeyLocation;
use super::super::p2p_protoc::P2pBridge;
use super::broadcast_to_single_peer;
const DEFAULT_SMALL_PAYLOAD_CONCURRENCY: usize = 12;
const DEFAULT_LARGE_PAYLOAD_CONCURRENCY: usize = 2;
const PAYLOAD_SIZE_THRESHOLD: usize = 64 * 1024;
const DEFAULT_MAX_QUEUE_DEPTH: usize = 256;
type DedupeKey = (ContractKey, PeerKeyLocation);
struct BroadcastEntry {
key: ContractKey,
target: PeerKeyLocation,
new_state: WrappedState,
payload_size: usize,
}
struct QueueState {
order: VecDeque<DedupeKey>,
entries: HashMap<DedupeKey, BroadcastEntry>,
}
impl QueueState {
fn new() -> Self {
Self {
order: VecDeque::new(),
entries: HashMap::new(),
}
}
fn len(&self) -> usize {
self.entries.len()
}
fn pop_front(&mut self) -> Option<BroadcastEntry> {
while let Some(key) = self.order.pop_front() {
if let Some(entry) = self.entries.remove(&key) {
return Some(entry);
}
}
None
}
}
#[derive(Clone)]
pub(crate) struct BroadcastQueue {
queue: Arc<Mutex<QueueState>>,
notify: Arc<Notify>,
small_payload_concurrency: usize,
large_payload_concurrency: usize,
max_queue_depth: usize,
}
impl BroadcastQueue {
pub(crate) fn new() -> Self {
Self {
queue: Arc::new(Mutex::new(QueueState::new())),
notify: Arc::new(Notify::new()),
small_payload_concurrency: DEFAULT_SMALL_PAYLOAD_CONCURRENCY,
large_payload_concurrency: DEFAULT_LARGE_PAYLOAD_CONCURRENCY,
max_queue_depth: DEFAULT_MAX_QUEUE_DEPTH,
}
}
pub(crate) async fn enqueue(
&self,
key: ContractKey,
target: PeerKeyLocation,
new_state: WrappedState,
) {
let dedup_key = (key, target.clone());
let mut queue = self.queue.lock().await;
if let Some(existing) = queue.entries.get_mut(&dedup_key) {
existing.new_state = new_state;
tracing::trace!(
contract = %dedup_key.0,
peer = ?target.socket_addr(),
"Broadcast queue: replaced stale entry with newer state"
);
} else {
while queue.len() >= self.max_queue_depth {
if let Some(entry) = queue.pop_front() {
tracing::warn!(
contract = %entry.key,
peer = ?entry.target.socket_addr(),
queue_depth = self.max_queue_depth,
"Broadcast queue full, evicted oldest entry"
);
} else {
break;
}
}
let payload_size = new_state.size();
queue.entries.insert(
dedup_key.clone(),
BroadcastEntry {
key,
target,
new_state,
payload_size,
},
);
queue.order.push_back(dedup_key);
}
drop(queue);
self.notify.notify_one();
}
pub(crate) fn start_worker(
&self,
bridge: P2pBridge,
op_manager: Arc<OpManager>,
) -> tokio::task::JoinHandle<()> {
let queue = self.queue.clone();
let notify = self.notify.clone();
let small_semaphore = Arc::new(Semaphore::new(self.small_payload_concurrency));
let large_semaphore = Arc::new(Semaphore::new(self.large_payload_concurrency));
tokio::spawn(async move {
loop {
let notified = notify.notified();
let mut drained_any = false;
loop {
let entry = {
let mut q = queue.lock().await;
q.pop_front()
};
let Some(entry) = entry else {
break; };
drained_any = true;
let sem = if entry.payload_size < PAYLOAD_SIZE_THRESHOLD {
small_semaphore.clone()
} else {
large_semaphore.clone()
};
let permit = sem.acquire_owned().await;
let Ok(permit) = permit else {
tracing::error!("Broadcast queue semaphore closed unexpectedly");
return;
};
let bridge = bridge.clone();
let op_manager = op_manager.clone();
tokio::spawn(async move {
let _permit = permit;
broadcast_to_single_peer(
&bridge,
&op_manager,
entry.key,
entry.new_state,
entry.target,
)
.await;
});
}
if !drained_any {
notified.await;
}
}
})
}
}
}
#[cfg(not(feature = "simulation_tests"))]
pub(crate) use queue::BroadcastQueue;
pub(super) async fn broadcast_to_single_peer(
bridge: &P2pBridge,
op_manager: &Arc<OpManager>,
key: ContractKey,
new_state: WrappedState,
target: PeerKeyLocation,
) {
use crate::message::{DeltaOrFullState, NetMessage};
use crate::node::network_bridge::NetworkBridge;
use crate::operations::update::{BroadcastStreamingPayload, UpdateMsg};
use crate::ring::PeerKey;
use crate::transport::peer_connection::StreamId;
let Some(peer_addr) = target.socket_addr() else {
return;
};
let peer_key = PeerKey::from(target.pub_key().clone());
let our_summary = op_manager
.interest_manager
.get_contract_summary(op_manager, &key)
.await;
let their_summary = op_manager
.interest_manager
.get_peer_summary(&key, &peer_key);
if let (Some(ours), Some(theirs)) = (&our_summary, &their_summary) {
if ours.as_ref() == theirs.as_ref() {
tracing::trace!(
contract = %key,
peer = %peer_addr,
"Skipping broadcast - peer already has our state"
);
return;
}
}
let (payload, sent_delta) = match (&our_summary, &their_summary) {
(Some(ours), Some(theirs)) => {
match op_manager
.interest_manager
.compute_delta(op_manager, &key, theirs, ours, new_state.size())
.await
{
Ok(Some(delta)) => (DeltaOrFullState::Delta(delta.as_ref().to_vec()), true),
Ok(None) => {
tracing::debug!(
contract = %key,
"Delta computation returned no change, sending full state"
);
(
DeltaOrFullState::FullState(new_state.as_ref().to_vec()),
false,
)
}
Err(err) => {
tracing::debug!(
contract = %key,
error = %err,
"Delta computation failed, falling back to full state"
);
(
DeltaOrFullState::FullState(new_state.as_ref().to_vec()),
false,
)
}
}
}
_ => (
DeltaOrFullState::FullState(new_state.as_ref().to_vec()),
false,
),
};
let payload_size = payload.size();
let update_tx = crate::message::Transaction::new::<crate::operations::update::UpdateMsg>();
let use_streaming = matches!(&payload, DeltaOrFullState::FullState(_))
&& crate::operations::should_use_streaming(op_manager.streaming_threshold, payload_size);
let send_result = if use_streaming {
let sender_summary_bytes = our_summary
.as_ref()
.map(|s| s.as_ref().to_vec())
.unwrap_or_default();
let state_bytes = match payload {
DeltaOrFullState::FullState(data) => data,
_ => unreachable!("checked above"),
};
let streaming_payload = BroadcastStreamingPayload {
state_bytes,
sender_summary_bytes,
};
let payload_bytes = match bincode::serialize(&streaming_payload) {
Ok(b) => b,
Err(e) => {
tracing::warn!(
tx = %update_tx,
error = %e,
"Failed to serialize BroadcastStreamingPayload, skipping"
);
return;
}
};
let sid = StreamId::next_operations();
tracing::debug!(
tx = %update_tx,
contract = %key,
peer = %peer_addr,
stream_id = %sid,
payload_size,
"Using streaming for BroadcastTo (via queue)"
);
let msg = UpdateMsg::BroadcastToStreaming {
id: update_tx,
stream_id: sid,
key,
total_size: payload_bytes.len() as u64,
};
let net_msg: NetMessage = msg.into();
let metadata = match bincode::serialize(&net_msg) {
Ok(bytes) => Some(bytes::Bytes::from(bytes)),
Err(e) => {
tracing::warn!(
?peer_addr,
error = %e,
"Failed to serialize BroadcastTo metadata for embedding"
);
None
}
};
let send_res = bridge.send(peer_addr, net_msg).await;
if send_res.is_ok() {
let (completion_tx, completion_rx) = tokio::sync::oneshot::channel();
if let Err(err) = bridge
.send_stream_with_completion(
peer_addr,
sid,
bytes::Bytes::from(payload_bytes),
metadata,
Some(completion_tx),
)
.await
{
tracing::warn!(
tx = %update_tx,
peer = %peer_addr,
error = %err,
"Failed to send broadcast stream data"
);
} else {
match tokio::time::timeout(STREAM_COMPLETION_TIMEOUT, completion_rx).await {
Ok(Ok(())) => {
tracing::debug!(
tx = %update_tx,
peer = %peer_addr,
"Broadcast stream completed successfully"
);
}
Ok(Err(_)) => {
tracing::debug!(
tx = %update_tx,
peer = %peer_addr,
"Broadcast stream completion channel dropped (task ended)"
);
}
Err(_) => {
tracing::warn!(
tx = %update_tx,
peer = %peer_addr,
timeout_secs = STREAM_COMPLETION_TIMEOUT.as_secs(),
"Broadcast stream completion timed out, releasing permit"
);
}
}
}
}
send_res
} else {
let msg = UpdateMsg::BroadcastTo {
id: update_tx,
key,
payload,
sender_summary_bytes: our_summary
.as_ref()
.map(|s| s.as_ref().to_vec())
.unwrap_or_default(),
};
bridge.send(peer_addr, msg.into()).await
};
let send_ok = send_result.is_ok();
if let Err(err) = send_result {
tracing::warn!(
tx = %update_tx,
peer = %peer_addr,
error = %err,
"Failed to send state change broadcast (queued)"
);
} else {
if sent_delta {
op_manager
.interest_manager
.record_delta_send(new_state.size(), payload_size);
crate::config::GlobalTestMetrics::record_delta_send();
} else {
op_manager.interest_manager.record_full_state_send();
crate::config::GlobalTestMetrics::record_full_state_send();
}
op_manager
.interest_manager
.refresh_peer_interest(&key, &peer_key);
}
if send_ok && sent_delta {
if let Some(summary) = &our_summary {
op_manager
.interest_manager
.update_peer_summary(&key, &peer_key, Some(summary.clone()));
}
}
}