use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use freenet_stdlib::prelude::{ContractKey, WrappedState};
use crate::node::OpManager;
use crate::ring::PeerKeyLocation;
use crate::transport::BroadcastDeliveryOutcome;
use super::p2p_protoc::P2pBridge;
const STREAM_COMPLETION_TIMEOUT: Duration = Duration::from_secs(120);
pub(crate) static BROADCAST_STREAM_METRICS: BroadcastStreamMetrics = BroadcastStreamMetrics::new();
pub(crate) struct BroadcastStreamMetrics {
streaming_attempts_total: AtomicU64,
streaming_failures_total: AtomicU64,
}
#[derive(Debug, Clone, Copy)]
pub(crate) struct BroadcastStreamMetricsSnapshot {
pub streaming_attempts_total: u64,
pub streaming_failures_total: u64,
}
impl BroadcastStreamMetrics {
const fn new() -> Self {
Self {
streaming_attempts_total: AtomicU64::new(0),
streaming_failures_total: AtomicU64::new(0),
}
}
fn record_attempt(&self, delivered: bool) {
self.streaming_attempts_total
.fetch_add(1, Ordering::Relaxed);
if !delivered {
self.streaming_failures_total
.fetch_add(1, Ordering::Relaxed);
}
}
pub(crate) fn snapshot(&self) -> BroadcastStreamMetricsSnapshot {
BroadcastStreamMetricsSnapshot {
streaming_attempts_total: self.streaming_attempts_total.load(Ordering::Relaxed),
streaming_failures_total: self.streaming_failures_total.load(Ordering::Relaxed),
}
}
}
pub(super) fn should_broadcast_contract(op_manager: &Arc<OpManager>, key: &ContractKey) -> bool {
op_manager.ring.is_hosting_contract(key) || op_manager.ring.contract_in_use(key)
}
#[cfg(not(feature = "simulation_tests"))]
mod queue {
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use freenet_stdlib::prelude::{ContractKey, WrappedState};
use tokio::sync::{Mutex, Notify, Semaphore};
use crate::node::OpManager;
use crate::ring::PeerKeyLocation;
use super::super::p2p_protoc::P2pBridge;
use super::broadcast_to_single_peer;
const DEFAULT_SMALL_PAYLOAD_CONCURRENCY: usize = 12;
const DEFAULT_LARGE_PAYLOAD_CONCURRENCY: usize = 2;
const PAYLOAD_SIZE_THRESHOLD: usize = 64 * 1024;
const DEFAULT_MAX_QUEUE_DEPTH: usize = 256;
type DedupeKey = (ContractKey, PeerKeyLocation);
struct BroadcastEntry {
key: ContractKey,
target: PeerKeyLocation,
new_state: WrappedState,
payload_size: usize,
}
struct QueueState {
order: VecDeque<DedupeKey>,
entries: HashMap<DedupeKey, BroadcastEntry>,
}
impl QueueState {
fn new() -> Self {
Self {
order: VecDeque::new(),
entries: HashMap::new(),
}
}
fn len(&self) -> usize {
self.entries.len()
}
fn pop_front(&mut self) -> Option<BroadcastEntry> {
while let Some(key) = self.order.pop_front() {
if let Some(entry) = self.entries.remove(&key) {
return Some(entry);
}
}
None
}
}
#[derive(Clone)]
pub(crate) struct BroadcastQueue {
queue: Arc<Mutex<QueueState>>,
notify: Arc<Notify>,
small_payload_concurrency: usize,
large_payload_concurrency: usize,
max_queue_depth: usize,
}
impl BroadcastQueue {
pub(crate) fn new() -> Self {
Self {
queue: Arc::new(Mutex::new(QueueState::new())),
notify: Arc::new(Notify::new()),
small_payload_concurrency: DEFAULT_SMALL_PAYLOAD_CONCURRENCY,
large_payload_concurrency: DEFAULT_LARGE_PAYLOAD_CONCURRENCY,
max_queue_depth: DEFAULT_MAX_QUEUE_DEPTH,
}
}
pub(crate) async fn enqueue(
&self,
key: ContractKey,
target: PeerKeyLocation,
new_state: WrappedState,
) {
let dedup_key = (key, target.clone());
let mut queue = self.queue.lock().await;
if let Some(existing) = queue.entries.get_mut(&dedup_key) {
existing.new_state = new_state;
tracing::trace!(
contract = %dedup_key.0,
peer = ?target.socket_addr(),
"Broadcast queue: replaced stale entry with newer state"
);
} else {
while queue.len() >= self.max_queue_depth {
if let Some(entry) = queue.pop_front() {
tracing::warn!(
contract = %entry.key,
peer = ?entry.target.socket_addr(),
queue_depth = self.max_queue_depth,
"Broadcast queue full, evicted oldest entry"
);
} else {
break;
}
}
let payload_size = new_state.size();
queue.entries.insert(
dedup_key.clone(),
BroadcastEntry {
key,
target,
new_state,
payload_size,
},
);
queue.order.push_back(dedup_key);
}
crate::transport::shadow_demand::record_broadcast_queue_depth(queue.len());
drop(queue);
self.notify.notify_one();
}
pub(crate) fn start_worker(
&self,
bridge: P2pBridge,
op_manager: Arc<OpManager>,
) -> tokio::task::JoinHandle<()> {
let queue = self.queue.clone();
let notify = self.notify.clone();
let small_semaphore = Arc::new(Semaphore::new(self.small_payload_concurrency));
let large_semaphore = Arc::new(Semaphore::new(self.large_payload_concurrency));
tokio::spawn(async move {
loop {
let notified = notify.notified();
let mut drained_any = false;
loop {
let entry = {
let mut q = queue.lock().await;
let entry = q.pop_front();
crate::transport::shadow_demand::record_broadcast_queue_depth(q.len());
entry
};
let Some(entry) = entry else {
break; };
drained_any = true;
let sem = if entry.payload_size < PAYLOAD_SIZE_THRESHOLD {
small_semaphore.clone()
} else {
large_semaphore.clone()
};
let permit = sem.acquire_owned().await;
let Ok(permit) = permit else {
tracing::error!("Broadcast queue semaphore closed unexpectedly");
return;
};
let bridge = bridge.clone();
let op_manager = op_manager.clone();
tokio::spawn(async move {
let _permit = permit;
broadcast_to_single_peer(
&bridge,
&op_manager,
entry.key,
entry.new_state,
entry.target,
)
.await;
});
}
if !drained_any {
notified.await;
}
}
})
}
}
}
#[cfg(not(feature = "simulation_tests"))]
pub(crate) use queue::BroadcastQueue;
fn streaming_completion_delivered(completion: StreamCompletionResult) -> bool {
matches!(completion, Ok(Ok(BroadcastDeliveryOutcome::Delivered)))
}
type StreamCompletionResult = Result<
Result<BroadcastDeliveryOutcome, tokio::sync::oneshot::error::RecvError>,
tokio::time::error::Elapsed,
>;
#[allow(clippy::too_many_arguments)]
fn record_streaming_delivery<T: crate::util::time_source::TimeSource + Sync>(
interest_manager: &crate::ring::interest::InterestManager<T>,
completion: StreamCompletionResult,
sent_delta: bool,
key: &ContractKey,
peer_key: &crate::ring::PeerKey,
our_summary: Option<&freenet_stdlib::prelude::StateSummary<'static>>,
state_size: usize,
payload_size: usize,
) -> bool {
let delivered = streaming_completion_delivered(completion);
if delivered {
record_delivery_to_interest(
interest_manager,
sent_delta,
key,
peer_key,
our_summary,
state_size,
payload_size,
);
}
delivered
}
fn record_delivery_to_interest<T: crate::util::time_source::TimeSource + Sync>(
interest_manager: &crate::ring::interest::InterestManager<T>,
sent_delta: bool,
key: &ContractKey,
peer_key: &crate::ring::PeerKey,
our_summary: Option<&freenet_stdlib::prelude::StateSummary<'static>>,
state_size: usize,
payload_size: usize,
) {
if sent_delta {
interest_manager.record_delta_send(state_size, payload_size);
crate::config::GlobalTestMetrics::record_delta_send();
} else {
interest_manager.record_full_state_send();
crate::config::GlobalTestMetrics::record_full_state_send();
}
interest_manager.refresh_peer_interest(key, peer_key);
if let Some(summary) = our_summary {
interest_manager.update_peer_summary(key, peer_key, Some(summary.clone()));
}
}
pub(super) async fn broadcast_to_single_peer(
bridge: &P2pBridge,
op_manager: &Arc<OpManager>,
key: ContractKey,
new_state: WrappedState,
target: PeerKeyLocation,
) {
use crate::message::{DeltaOrFullState, NetMessage};
use crate::node::network_bridge::NetworkBridge;
use crate::operations::update::{BroadcastStreamingPayload, UpdateMsg};
use crate::ring::PeerKey;
use crate::transport::peer_connection::StreamId;
let Some(peer_addr) = target.socket_addr() else {
return;
};
if !should_broadcast_contract(op_manager, &key) {
tracing::trace!(
contract = %key,
peer = %peer_addr,
"Skipping broadcast - contract not hosted or in use"
);
return;
}
let peer_key = PeerKey::from(target.pub_key().clone());
let our_summary = op_manager
.interest_manager
.get_contract_summary(op_manager, &key)
.await;
let their_summary = op_manager
.interest_manager
.get_peer_summary(&key, &peer_key);
if let (Some(ours), Some(theirs)) = (&our_summary, &their_summary) {
if ours.as_ref() == theirs.as_ref() {
tracing::trace!(
contract = %key,
peer = %peer_addr,
"Skipping broadcast - peer already has our state"
);
return;
}
}
let (payload, sent_delta) = match (&our_summary, &their_summary) {
(Some(ours), Some(theirs)) => {
match op_manager
.interest_manager
.compute_delta(op_manager, &key, theirs, ours, new_state.size())
.await
{
Ok(Some(delta)) => (DeltaOrFullState::Delta(delta.as_ref().to_vec()), true),
Ok(None) => {
tracing::debug!(
contract = %key,
"Delta computation returned no change, sending full state"
);
(
DeltaOrFullState::FullState(new_state.as_ref().to_vec()),
false,
)
}
Err(err) => {
tracing::debug!(
contract = %key,
error = %err,
"Delta computation failed, falling back to full state"
);
(
DeltaOrFullState::FullState(new_state.as_ref().to_vec()),
false,
)
}
}
}
_ => (
DeltaOrFullState::FullState(new_state.as_ref().to_vec()),
false,
),
};
let payload_size = payload.size();
let update_tx = crate::message::Transaction::new::<crate::operations::update::UpdateMsg>();
let use_streaming = matches!(&payload, DeltaOrFullState::FullState(_))
&& crate::operations::should_use_streaming(op_manager.streaming_threshold, payload_size);
let send_result = if use_streaming {
let sender_summary_bytes = our_summary
.as_ref()
.map(|s| s.as_ref().to_vec())
.unwrap_or_default();
let state_bytes = match payload {
DeltaOrFullState::FullState(data) => data,
_ => unreachable!("checked above"),
};
let streaming_payload = BroadcastStreamingPayload {
state_bytes,
sender_summary_bytes,
};
let payload_bytes = match bincode::serialize(&streaming_payload) {
Ok(b) => b,
Err(e) => {
tracing::warn!(
tx = %update_tx,
error = %e,
"Failed to serialize BroadcastStreamingPayload, skipping"
);
return;
}
};
let sid = StreamId::next_operations();
tracing::debug!(
tx = %update_tx,
contract = %key,
peer = %peer_addr,
stream_id = %sid,
payload_size,
"Using streaming for BroadcastTo (via queue)"
);
let msg = UpdateMsg::BroadcastToStreaming {
id: update_tx,
stream_id: sid,
key,
total_size: payload_bytes.len() as u64,
};
let net_msg: NetMessage = msg.into();
let metadata = match bincode::serialize(&net_msg) {
Ok(bytes) => Some(bytes::Bytes::from(bytes)),
Err(e) => {
tracing::warn!(
?peer_addr,
error = %e,
"Failed to serialize BroadcastTo metadata for embedding"
);
None
}
};
let send_res = bridge.send(peer_addr, net_msg).await;
if send_res.is_err() {
BROADCAST_STREAM_METRICS.record_attempt(false);
} else {
let (completion_tx, completion_rx) = tokio::sync::oneshot::channel();
if let Err(err) = bridge
.send_stream_with_completion(
peer_addr,
sid,
bytes::Bytes::from(payload_bytes),
metadata,
Some(completion_tx),
)
.await
{
BROADCAST_STREAM_METRICS.record_attempt(false);
tracing::warn!(
tx = %update_tx,
peer = %peer_addr,
error = %err,
"Failed to send broadcast stream data"
);
} else {
let completion =
tokio::time::timeout(STREAM_COMPLETION_TIMEOUT, completion_rx).await;
let delivered = record_streaming_delivery(
&op_manager.interest_manager,
completion,
sent_delta,
&key,
&peer_key,
our_summary.as_ref(),
new_state.size(),
payload_size,
);
BROADCAST_STREAM_METRICS.record_attempt(delivered);
if delivered {
tracing::debug!(
tx = %update_tx,
peer = %peer_addr,
"Broadcast stream completed successfully"
);
} else {
tracing::debug!(
tx = %update_tx,
peer = %peer_addr,
timeout_secs = STREAM_COMPLETION_TIMEOUT.as_secs(),
"Broadcast stream dropped or timed out before delivery \
(permit released, interest NOT refreshed)"
);
}
}
}
send_res
} else {
let msg = UpdateMsg::BroadcastTo {
id: update_tx,
key,
payload,
sender_summary_bytes: our_summary
.as_ref()
.map(|s| s.as_ref().to_vec())
.unwrap_or_default(),
};
let res = bridge.send(peer_addr, msg.into()).await;
if res.is_ok() {
record_delivery_to_interest(
&op_manager.interest_manager,
sent_delta,
&key,
&peer_key,
our_summary.as_ref(),
new_state.size(),
payload_size,
);
}
res
};
if let Err(err) = &send_result {
tracing::warn!(
tx = %update_tx,
peer = %peer_addr,
error = %err,
"Failed to send state change broadcast (queued)"
);
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use freenet_stdlib::prelude::{CodeHash, ContractInstanceId, ContractKey, StateSummary};
use crate::ring::PeerKey;
use crate::ring::interest::InterestManager;
use crate::transport::{BroadcastDeliveryOutcome, TransportKeypair};
use crate::util::time_source::SharedMockTimeSource;
use super::{
BroadcastStreamMetrics, record_streaming_delivery, streaming_completion_delivered,
};
#[test]
fn broadcast_stream_metrics_counts_attempts_and_failures() {
let m = BroadcastStreamMetrics::new();
let s = m.snapshot();
assert_eq!(s.streaming_attempts_total, 0, "starts at zero");
assert_eq!(s.streaming_failures_total, 0, "starts at zero");
m.record_attempt(true);
let s = m.snapshot();
assert_eq!(s.streaming_attempts_total, 1);
assert_eq!(s.streaming_failures_total, 0, "delivered is not a failure");
m.record_attempt(false);
let s = m.snapshot();
assert_eq!(s.streaming_attempts_total, 2, "every attempt counts");
assert_eq!(s.streaming_failures_total, 1, "the drop is counted");
m.record_attempt(false);
m.record_attempt(true);
let s = m.snapshot();
assert_eq!(s.streaming_attempts_total, 4);
assert_eq!(s.streaming_failures_total, 2);
}
fn make_contract_key(seed: u8) -> ContractKey {
ContractKey::from_id_and_code(
ContractInstanceId::new([seed; 32]),
CodeHash::new([seed.wrapping_add(1); 32]),
)
}
fn make_peer_key() -> PeerKey {
PeerKey(TransportKeypair::new().public().clone())
}
async fn dropped_oneshot()
-> Result<BroadcastDeliveryOutcome, tokio::sync::oneshot::error::RecvError> {
let (tx, rx) = tokio::sync::oneshot::channel::<BroadcastDeliveryOutcome>();
drop(tx);
rx.await.map(|_| unreachable!("sender was dropped"))
}
async fn elapsed_timeout() -> tokio::time::error::Elapsed {
let (tx, rx) = tokio::sync::oneshot::channel::<BroadcastDeliveryOutcome>();
let res = tokio::time::timeout(Duration::from_millis(1), rx).await;
drop(tx);
res.expect_err("never-resolving recv must time out")
}
#[tokio::test]
async fn streaming_completion_delivered_only_on_explicit_delivery() {
assert!(
streaming_completion_delivered(Ok(Ok(BroadcastDeliveryOutcome::Delivered))),
"an explicit Delivered outcome must be treated as a delivery"
);
assert!(
!streaming_completion_delivered(Ok(Ok(BroadcastDeliveryOutcome::Dropped))),
"an explicit Dropped outcome must NOT be treated as a delivery (#4235)"
);
assert!(
!streaming_completion_delivered(Ok(dropped_oneshot().await)),
"a dropped completion oneshot must NOT be treated as a delivery (#4235)"
);
assert!(
!streaming_completion_delivered(Err(elapsed_timeout().await)),
"a completion-wait timeout must NOT be treated as a delivery (#4235)"
);
}
#[tokio::test]
async fn drop_outcome_does_not_refresh_interest_or_cache_summary() {
let our_summary = StateSummary::from(vec![9, 9, 9, 9]);
let dropped = dropped_oneshot().await;
let timed_out = elapsed_timeout().await;
let cases: Vec<(&str, super::StreamCompletionResult, bool)> = vec![
(
"delivered",
Ok(Ok(BroadcastDeliveryOutcome::Delivered)),
true,
),
(
"explicit-drop",
Ok(Ok(BroadcastDeliveryOutcome::Dropped)),
false,
),
("dropped-oneshot", Ok(dropped), false),
("timeout", Err(timed_out), false),
];
for (name, completion, expect_delivered) in cases {
let time_source = SharedMockTimeSource::new();
let manager = InterestManager::new(time_source.clone());
let contract = make_contract_key(7);
let peer = make_peer_key();
manager.register_peer_interest(&contract, peer.clone(), None, false);
let baseline = manager
.get_peer_interest(&contract, &peer)
.expect("peer interest registered")
.last_refreshed;
time_source.advance_time(Duration::from_secs(5));
let delivered = record_streaming_delivery(
&manager,
completion,
true,
&contract,
&peer,
Some(&our_summary),
1024,
64,
);
assert_eq!(
delivered, expect_delivered,
"[{name}] classification mismatch"
);
let interest = manager
.get_peer_interest(&contract, &peer)
.expect("peer interest still registered");
if expect_delivered {
assert!(
interest.last_refreshed > baseline,
"[{name}] a real delivery MUST refresh the peer interest TTL"
);
assert_eq!(
manager.get_peer_summary(&contract, &peer),
Some(our_summary.clone()),
"[{name}] a real delivery MUST cache the peer summary"
);
} else {
assert_eq!(
interest.last_refreshed, baseline,
"[{name}] a dropped/timed-out broadcast MUST NOT refresh the \
peer interest TTL (#4235)"
);
assert_eq!(
manager.get_peer_summary(&contract, &peer),
None,
"[{name}] a dropped/timed-out broadcast MUST NOT cache the peer \
summary, or the next summary-mismatch resend is suppressed (#4235)"
);
}
}
}
#[tokio::test]
async fn full_state_delivery_caches_summary_so_next_broadcast_is_delta() {
let our_summary = StateSummary::from(vec![1, 2, 3, 4]);
let time_source = SharedMockTimeSource::new();
let manager = InterestManager::new(time_source.clone());
let contract = make_contract_key(42);
let peer = make_peer_key();
manager.register_peer_interest(&contract, peer.clone(), None, false);
assert_eq!(
manager.get_peer_summary(&contract, &peer),
None,
"precondition: a brand-new subscriber has no cached summary, so the \
first broadcast must be full state"
);
let delivered = record_streaming_delivery(
&manager,
Ok(Ok(BroadcastDeliveryOutcome::Delivered)),
false,
&contract,
&peer,
Some(&our_summary),
4096,
4096,
);
assert!(delivered, "a Delivered outcome must classify as delivered");
assert_eq!(
manager.get_peer_summary(&contract, &peer),
Some(our_summary.clone()),
"#4145: a delivered FULL-STATE broadcast must cache the peer summary, \
so the next broadcast can be a delta — otherwise the peer is trapped \
sending full state forever (the #4233 storm)"
);
let their_summary = manager.get_peer_summary(&contract, &peer);
assert!(
their_summary.is_some(),
"#4145: with a cached peer summary the next broadcast takes the delta \
path (compute_delta), not another full state"
);
}
#[tokio::test]
async fn dropped_full_state_stream_does_not_cache_summary() {
let our_summary = StateSummary::from(vec![5, 6, 7, 8]);
let dropped = dropped_oneshot().await;
let timed_out = elapsed_timeout().await;
let cases: Vec<(&str, super::StreamCompletionResult)> = vec![
("explicit-drop", Ok(Ok(BroadcastDeliveryOutcome::Dropped))),
("dropped-oneshot", Ok(dropped)),
("timeout", Err(timed_out)),
];
for (name, completion) in cases {
let time_source = SharedMockTimeSource::new();
let manager = InterestManager::new(time_source.clone());
let contract = make_contract_key(43);
let peer = make_peer_key();
manager.register_peer_interest(&contract, peer.clone(), None, false);
let delivered = record_streaming_delivery(
&manager,
completion,
false,
&contract,
&peer,
Some(&our_summary),
4096,
4096,
);
assert!(
!delivered,
"[{name}] a dropped/timed-out full-state stream must NOT classify \
as delivered"
);
assert_eq!(
manager.get_peer_summary(&contract, &peer),
None,
"[{name}] #4145 must not weaken the #2763/#4235 guard: a DROPPED \
full-state stream must NOT cache the summary (the peer never got \
the state), or the next summary-mismatch resend is suppressed"
);
}
}
#[test]
fn broadcast_single_peer_gates_summarize_on_hosted_or_in_use_pin() {
let src = include_str!("broadcast_queue.rs");
let helper_start = src
.find("pub(super) fn should_broadcast_contract(")
.expect("should_broadcast_contract helper not found");
let helper_end = helper_start
+ src[helper_start..]
.find("\n}\n")
.expect("should_broadcast_contract body end not found");
let helper_src = &src[helper_start..helper_end];
assert!(
helper_src.contains("is_hosting_contract"),
"should_broadcast_contract must gate on is_hosting_contract"
);
assert!(
helper_src.contains("contract_in_use"),
"should_broadcast_contract must ALSO gate on contract_in_use so an \
evicted-but-in-use stateful contract keeps being broadcast"
);
let fn_start = src
.find("pub(super) async fn broadcast_to_single_peer(")
.expect("broadcast_to_single_peer not found");
let fn_src = &src[fn_start..];
let gate_off = fn_src.find("should_broadcast_contract(op_manager").expect(
"broadcast_to_single_peer must call should_broadcast_contract — a bare \
get_contract_summary here reintroduces the #4473 storm",
);
let summarize_off = fn_src
.find("get_contract_summary(")
.expect("broadcast_to_single_peer get_contract_summary call not found");
assert!(
gate_off < summarize_off,
"broadcast_to_single_peer must gate on should_broadcast_contract BEFORE \
calling get_contract_summary (#4473) — otherwise the summarize storm \
fires for every phantom contract before the gate can skip it"
);
}
#[test]
fn broadcast_to_single_peer_records_attempt_on_every_streaming_exit_pin() {
let src = include_str!("broadcast_queue.rs");
let fn_start = src
.find("pub(super) async fn broadcast_to_single_peer(")
.expect("broadcast_to_single_peer not found");
let after = &src[fn_start..];
let fn_end = after
.find("\nmod tests {")
.or_else(|| after.find("\n#[cfg(test)]"))
.expect("end of broadcast_to_single_peer (start of tests module) not found");
let body = &after[..fn_end];
let record_calls = body.matches(".record_attempt(").count();
assert_eq!(
record_calls, 3,
"broadcast_to_single_peer's streaming branch must call record_attempt \
on all three exits (initial-send Err, dispatch Err, post-dispatch \
outcome) — got {record_calls}. A dropped early-exit record silently \
biases the v0.2.73 incident gauge LOW under congestion."
);
let failure_calls = body.matches(".record_attempt(false)").count();
assert_eq!(
failure_calls, 2,
"exactly the two early-failure exits must record record_attempt(false) \
(got {failure_calls}); the third exit records record_attempt(delivered)"
);
}
}