use std::sync::Arc;
use std::time::Duration;
use freenet_stdlib::prelude::{ContractKey, WrappedState};
use crate::node::OpManager;
use crate::ring::PeerKeyLocation;
use crate::transport::BroadcastDeliveryOutcome;
use super::p2p_protoc::P2pBridge;
const STREAM_COMPLETION_TIMEOUT: Duration = Duration::from_secs(120);
#[cfg(not(feature = "simulation_tests"))]
mod queue {
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use freenet_stdlib::prelude::{ContractKey, WrappedState};
use tokio::sync::{Mutex, Notify, Semaphore};
use crate::node::OpManager;
use crate::ring::PeerKeyLocation;
use super::super::p2p_protoc::P2pBridge;
use super::broadcast_to_single_peer;
const DEFAULT_SMALL_PAYLOAD_CONCURRENCY: usize = 12;
const DEFAULT_LARGE_PAYLOAD_CONCURRENCY: usize = 2;
const PAYLOAD_SIZE_THRESHOLD: usize = 64 * 1024;
const DEFAULT_MAX_QUEUE_DEPTH: usize = 256;
type DedupeKey = (ContractKey, PeerKeyLocation);
struct BroadcastEntry {
key: ContractKey,
target: PeerKeyLocation,
new_state: WrappedState,
payload_size: usize,
}
struct QueueState {
order: VecDeque<DedupeKey>,
entries: HashMap<DedupeKey, BroadcastEntry>,
}
impl QueueState {
fn new() -> Self {
Self {
order: VecDeque::new(),
entries: HashMap::new(),
}
}
fn len(&self) -> usize {
self.entries.len()
}
fn pop_front(&mut self) -> Option<BroadcastEntry> {
while let Some(key) = self.order.pop_front() {
if let Some(entry) = self.entries.remove(&key) {
return Some(entry);
}
}
None
}
}
#[derive(Clone)]
pub(crate) struct BroadcastQueue {
queue: Arc<Mutex<QueueState>>,
notify: Arc<Notify>,
small_payload_concurrency: usize,
large_payload_concurrency: usize,
max_queue_depth: usize,
}
impl BroadcastQueue {
pub(crate) fn new() -> Self {
Self {
queue: Arc::new(Mutex::new(QueueState::new())),
notify: Arc::new(Notify::new()),
small_payload_concurrency: DEFAULT_SMALL_PAYLOAD_CONCURRENCY,
large_payload_concurrency: DEFAULT_LARGE_PAYLOAD_CONCURRENCY,
max_queue_depth: DEFAULT_MAX_QUEUE_DEPTH,
}
}
pub(crate) async fn enqueue(
&self,
key: ContractKey,
target: PeerKeyLocation,
new_state: WrappedState,
) {
let dedup_key = (key, target.clone());
let mut queue = self.queue.lock().await;
if let Some(existing) = queue.entries.get_mut(&dedup_key) {
existing.new_state = new_state;
tracing::trace!(
contract = %dedup_key.0,
peer = ?target.socket_addr(),
"Broadcast queue: replaced stale entry with newer state"
);
} else {
while queue.len() >= self.max_queue_depth {
if let Some(entry) = queue.pop_front() {
tracing::warn!(
contract = %entry.key,
peer = ?entry.target.socket_addr(),
queue_depth = self.max_queue_depth,
"Broadcast queue full, evicted oldest entry"
);
} else {
break;
}
}
let payload_size = new_state.size();
queue.entries.insert(
dedup_key.clone(),
BroadcastEntry {
key,
target,
new_state,
payload_size,
},
);
queue.order.push_back(dedup_key);
}
crate::transport::shadow_demand::record_broadcast_queue_depth(queue.len());
drop(queue);
self.notify.notify_one();
}
pub(crate) fn start_worker(
&self,
bridge: P2pBridge,
op_manager: Arc<OpManager>,
) -> tokio::task::JoinHandle<()> {
let queue = self.queue.clone();
let notify = self.notify.clone();
let small_semaphore = Arc::new(Semaphore::new(self.small_payload_concurrency));
let large_semaphore = Arc::new(Semaphore::new(self.large_payload_concurrency));
tokio::spawn(async move {
loop {
let notified = notify.notified();
let mut drained_any = false;
loop {
let entry = {
let mut q = queue.lock().await;
let entry = q.pop_front();
crate::transport::shadow_demand::record_broadcast_queue_depth(q.len());
entry
};
let Some(entry) = entry else {
break; };
drained_any = true;
let sem = if entry.payload_size < PAYLOAD_SIZE_THRESHOLD {
small_semaphore.clone()
} else {
large_semaphore.clone()
};
let permit = sem.acquire_owned().await;
let Ok(permit) = permit else {
tracing::error!("Broadcast queue semaphore closed unexpectedly");
return;
};
let bridge = bridge.clone();
let op_manager = op_manager.clone();
tokio::spawn(async move {
let _permit = permit;
broadcast_to_single_peer(
&bridge,
&op_manager,
entry.key,
entry.new_state,
entry.target,
)
.await;
});
}
if !drained_any {
notified.await;
}
}
})
}
}
}
#[cfg(not(feature = "simulation_tests"))]
pub(crate) use queue::BroadcastQueue;
fn streaming_completion_delivered(completion: StreamCompletionResult) -> bool {
matches!(completion, Ok(Ok(BroadcastDeliveryOutcome::Delivered)))
}
type StreamCompletionResult = Result<
Result<BroadcastDeliveryOutcome, tokio::sync::oneshot::error::RecvError>,
tokio::time::error::Elapsed,
>;
#[allow(clippy::too_many_arguments)]
fn record_streaming_delivery<T: crate::util::time_source::TimeSource + Sync>(
interest_manager: &crate::ring::interest::InterestManager<T>,
completion: StreamCompletionResult,
sent_delta: bool,
key: &ContractKey,
peer_key: &crate::ring::PeerKey,
our_summary: Option<&freenet_stdlib::prelude::StateSummary<'static>>,
state_size: usize,
payload_size: usize,
) -> bool {
let delivered = streaming_completion_delivered(completion);
if delivered {
record_delivery_to_interest(
interest_manager,
sent_delta,
key,
peer_key,
our_summary,
state_size,
payload_size,
);
}
delivered
}
fn record_delivery_to_interest<T: crate::util::time_source::TimeSource + Sync>(
interest_manager: &crate::ring::interest::InterestManager<T>,
sent_delta: bool,
key: &ContractKey,
peer_key: &crate::ring::PeerKey,
our_summary: Option<&freenet_stdlib::prelude::StateSummary<'static>>,
state_size: usize,
payload_size: usize,
) {
if sent_delta {
interest_manager.record_delta_send(state_size, payload_size);
crate::config::GlobalTestMetrics::record_delta_send();
} else {
interest_manager.record_full_state_send();
crate::config::GlobalTestMetrics::record_full_state_send();
}
interest_manager.refresh_peer_interest(key, peer_key);
if let Some(summary) = our_summary {
interest_manager.update_peer_summary(key, peer_key, Some(summary.clone()));
}
}
pub(super) async fn broadcast_to_single_peer(
bridge: &P2pBridge,
op_manager: &Arc<OpManager>,
key: ContractKey,
new_state: WrappedState,
target: PeerKeyLocation,
) {
use crate::message::{DeltaOrFullState, NetMessage};
use crate::node::network_bridge::NetworkBridge;
use crate::operations::update::{BroadcastStreamingPayload, UpdateMsg};
use crate::ring::PeerKey;
use crate::transport::peer_connection::StreamId;
let Some(peer_addr) = target.socket_addr() else {
return;
};
let peer_key = PeerKey::from(target.pub_key().clone());
let our_summary = op_manager
.interest_manager
.get_contract_summary(op_manager, &key)
.await;
let their_summary = op_manager
.interest_manager
.get_peer_summary(&key, &peer_key);
if let (Some(ours), Some(theirs)) = (&our_summary, &their_summary) {
if ours.as_ref() == theirs.as_ref() {
tracing::trace!(
contract = %key,
peer = %peer_addr,
"Skipping broadcast - peer already has our state"
);
return;
}
}
let (payload, sent_delta) = match (&our_summary, &their_summary) {
(Some(ours), Some(theirs)) => {
match op_manager
.interest_manager
.compute_delta(op_manager, &key, theirs, ours, new_state.size())
.await
{
Ok(Some(delta)) => (DeltaOrFullState::Delta(delta.as_ref().to_vec()), true),
Ok(None) => {
tracing::debug!(
contract = %key,
"Delta computation returned no change, sending full state"
);
(
DeltaOrFullState::FullState(new_state.as_ref().to_vec()),
false,
)
}
Err(err) => {
tracing::debug!(
contract = %key,
error = %err,
"Delta computation failed, falling back to full state"
);
(
DeltaOrFullState::FullState(new_state.as_ref().to_vec()),
false,
)
}
}
}
_ => (
DeltaOrFullState::FullState(new_state.as_ref().to_vec()),
false,
),
};
let payload_size = payload.size();
let update_tx = crate::message::Transaction::new::<crate::operations::update::UpdateMsg>();
let use_streaming = matches!(&payload, DeltaOrFullState::FullState(_))
&& crate::operations::should_use_streaming(op_manager.streaming_threshold, payload_size);
let send_result = if use_streaming {
let sender_summary_bytes = our_summary
.as_ref()
.map(|s| s.as_ref().to_vec())
.unwrap_or_default();
let state_bytes = match payload {
DeltaOrFullState::FullState(data) => data,
_ => unreachable!("checked above"),
};
let streaming_payload = BroadcastStreamingPayload {
state_bytes,
sender_summary_bytes,
};
let payload_bytes = match bincode::serialize(&streaming_payload) {
Ok(b) => b,
Err(e) => {
tracing::warn!(
tx = %update_tx,
error = %e,
"Failed to serialize BroadcastStreamingPayload, skipping"
);
return;
}
};
let sid = StreamId::next_operations();
tracing::debug!(
tx = %update_tx,
contract = %key,
peer = %peer_addr,
stream_id = %sid,
payload_size,
"Using streaming for BroadcastTo (via queue)"
);
let msg = UpdateMsg::BroadcastToStreaming {
id: update_tx,
stream_id: sid,
key,
total_size: payload_bytes.len() as u64,
};
let net_msg: NetMessage = msg.into();
let metadata = match bincode::serialize(&net_msg) {
Ok(bytes) => Some(bytes::Bytes::from(bytes)),
Err(e) => {
tracing::warn!(
?peer_addr,
error = %e,
"Failed to serialize BroadcastTo metadata for embedding"
);
None
}
};
let send_res = bridge.send(peer_addr, net_msg).await;
if send_res.is_ok() {
let (completion_tx, completion_rx) = tokio::sync::oneshot::channel();
if let Err(err) = bridge
.send_stream_with_completion(
peer_addr,
sid,
bytes::Bytes::from(payload_bytes),
metadata,
Some(completion_tx),
)
.await
{
tracing::warn!(
tx = %update_tx,
peer = %peer_addr,
error = %err,
"Failed to send broadcast stream data"
);
} else {
let completion =
tokio::time::timeout(STREAM_COMPLETION_TIMEOUT, completion_rx).await;
let delivered = record_streaming_delivery(
&op_manager.interest_manager,
completion,
sent_delta,
&key,
&peer_key,
our_summary.as_ref(),
new_state.size(),
payload_size,
);
if delivered {
tracing::debug!(
tx = %update_tx,
peer = %peer_addr,
"Broadcast stream completed successfully"
);
} else {
tracing::debug!(
tx = %update_tx,
peer = %peer_addr,
timeout_secs = STREAM_COMPLETION_TIMEOUT.as_secs(),
"Broadcast stream dropped or timed out before delivery \
(permit released, interest NOT refreshed)"
);
}
}
}
send_res
} else {
let msg = UpdateMsg::BroadcastTo {
id: update_tx,
key,
payload,
sender_summary_bytes: our_summary
.as_ref()
.map(|s| s.as_ref().to_vec())
.unwrap_or_default(),
};
let res = bridge.send(peer_addr, msg.into()).await;
if res.is_ok() {
record_delivery_to_interest(
&op_manager.interest_manager,
sent_delta,
&key,
&peer_key,
our_summary.as_ref(),
new_state.size(),
payload_size,
);
}
res
};
if let Err(err) = &send_result {
tracing::warn!(
tx = %update_tx,
peer = %peer_addr,
error = %err,
"Failed to send state change broadcast (queued)"
);
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use freenet_stdlib::prelude::{CodeHash, ContractInstanceId, ContractKey, StateSummary};
use crate::ring::PeerKey;
use crate::ring::interest::InterestManager;
use crate::transport::{BroadcastDeliveryOutcome, TransportKeypair};
use crate::util::time_source::SharedMockTimeSource;
use super::{record_streaming_delivery, streaming_completion_delivered};
fn make_contract_key(seed: u8) -> ContractKey {
ContractKey::from_id_and_code(
ContractInstanceId::new([seed; 32]),
CodeHash::new([seed.wrapping_add(1); 32]),
)
}
fn make_peer_key() -> PeerKey {
PeerKey(TransportKeypair::new().public().clone())
}
async fn dropped_oneshot()
-> Result<BroadcastDeliveryOutcome, tokio::sync::oneshot::error::RecvError> {
let (tx, rx) = tokio::sync::oneshot::channel::<BroadcastDeliveryOutcome>();
drop(tx);
rx.await.map(|_| unreachable!("sender was dropped"))
}
async fn elapsed_timeout() -> tokio::time::error::Elapsed {
let (tx, rx) = tokio::sync::oneshot::channel::<BroadcastDeliveryOutcome>();
let res = tokio::time::timeout(Duration::from_millis(1), rx).await;
drop(tx);
res.expect_err("never-resolving recv must time out")
}
#[tokio::test]
async fn streaming_completion_delivered_only_on_explicit_delivery() {
assert!(
streaming_completion_delivered(Ok(Ok(BroadcastDeliveryOutcome::Delivered))),
"an explicit Delivered outcome must be treated as a delivery"
);
assert!(
!streaming_completion_delivered(Ok(Ok(BroadcastDeliveryOutcome::Dropped))),
"an explicit Dropped outcome must NOT be treated as a delivery (#4235)"
);
assert!(
!streaming_completion_delivered(Ok(dropped_oneshot().await)),
"a dropped completion oneshot must NOT be treated as a delivery (#4235)"
);
assert!(
!streaming_completion_delivered(Err(elapsed_timeout().await)),
"a completion-wait timeout must NOT be treated as a delivery (#4235)"
);
}
#[tokio::test]
async fn drop_outcome_does_not_refresh_interest_or_cache_summary() {
let our_summary = StateSummary::from(vec![9, 9, 9, 9]);
let dropped = dropped_oneshot().await;
let timed_out = elapsed_timeout().await;
let cases: Vec<(&str, super::StreamCompletionResult, bool)> = vec![
(
"delivered",
Ok(Ok(BroadcastDeliveryOutcome::Delivered)),
true,
),
(
"explicit-drop",
Ok(Ok(BroadcastDeliveryOutcome::Dropped)),
false,
),
("dropped-oneshot", Ok(dropped), false),
("timeout", Err(timed_out), false),
];
for (name, completion, expect_delivered) in cases {
let time_source = SharedMockTimeSource::new();
let manager = InterestManager::new(time_source.clone());
let contract = make_contract_key(7);
let peer = make_peer_key();
manager.register_peer_interest(&contract, peer.clone(), None, false);
let baseline = manager
.get_peer_interest(&contract, &peer)
.expect("peer interest registered")
.last_refreshed;
time_source.advance_time(Duration::from_secs(5));
let delivered = record_streaming_delivery(
&manager,
completion,
true,
&contract,
&peer,
Some(&our_summary),
1024,
64,
);
assert_eq!(
delivered, expect_delivered,
"[{name}] classification mismatch"
);
let interest = manager
.get_peer_interest(&contract, &peer)
.expect("peer interest still registered");
if expect_delivered {
assert!(
interest.last_refreshed > baseline,
"[{name}] a real delivery MUST refresh the peer interest TTL"
);
assert_eq!(
manager.get_peer_summary(&contract, &peer),
Some(our_summary.clone()),
"[{name}] a real delivery MUST cache the peer summary"
);
} else {
assert_eq!(
interest.last_refreshed, baseline,
"[{name}] a dropped/timed-out broadcast MUST NOT refresh the \
peer interest TTL (#4235)"
);
assert_eq!(
manager.get_peer_summary(&contract, &peer),
None,
"[{name}] a dropped/timed-out broadcast MUST NOT cache the peer \
summary, or the next summary-mismatch resend is suppressed (#4235)"
);
}
}
}
#[tokio::test]
async fn full_state_delivery_caches_summary_so_next_broadcast_is_delta() {
let our_summary = StateSummary::from(vec![1, 2, 3, 4]);
let time_source = SharedMockTimeSource::new();
let manager = InterestManager::new(time_source.clone());
let contract = make_contract_key(42);
let peer = make_peer_key();
manager.register_peer_interest(&contract, peer.clone(), None, false);
assert_eq!(
manager.get_peer_summary(&contract, &peer),
None,
"precondition: a brand-new subscriber has no cached summary, so the \
first broadcast must be full state"
);
let delivered = record_streaming_delivery(
&manager,
Ok(Ok(BroadcastDeliveryOutcome::Delivered)),
false,
&contract,
&peer,
Some(&our_summary),
4096,
4096,
);
assert!(delivered, "a Delivered outcome must classify as delivered");
assert_eq!(
manager.get_peer_summary(&contract, &peer),
Some(our_summary.clone()),
"#4145: a delivered FULL-STATE broadcast must cache the peer summary, \
so the next broadcast can be a delta — otherwise the peer is trapped \
sending full state forever (the #4233 storm)"
);
let their_summary = manager.get_peer_summary(&contract, &peer);
assert!(
their_summary.is_some(),
"#4145: with a cached peer summary the next broadcast takes the delta \
path (compute_delta), not another full state"
);
}
#[tokio::test]
async fn dropped_full_state_stream_does_not_cache_summary() {
let our_summary = StateSummary::from(vec![5, 6, 7, 8]);
let dropped = dropped_oneshot().await;
let timed_out = elapsed_timeout().await;
let cases: Vec<(&str, super::StreamCompletionResult)> = vec![
("explicit-drop", Ok(Ok(BroadcastDeliveryOutcome::Dropped))),
("dropped-oneshot", Ok(dropped)),
("timeout", Err(timed_out)),
];
for (name, completion) in cases {
let time_source = SharedMockTimeSource::new();
let manager = InterestManager::new(time_source.clone());
let contract = make_contract_key(43);
let peer = make_peer_key();
manager.register_peer_interest(&contract, peer.clone(), None, false);
let delivered = record_streaming_delivery(
&manager,
completion,
false,
&contract,
&peer,
Some(&our_summary),
4096,
4096,
);
assert!(
!delivered,
"[{name}] a dropped/timed-out full-state stream must NOT classify \
as delivered"
);
assert_eq!(
manager.get_peer_summary(&contract, &peer),
None,
"[{name}] #4145 must not weaken the #2763/#4235 guard: a DROPPED \
full-state stream must NOT cache the summary (the peer never got \
the state), or the next summary-mismatch resend is suppressed"
);
}
}
}