1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#![allow(
clippy::cast_precision_loss,
reason = "M175: EWMA — f64 precision intentional for rate calculation"
)]
//! Per-peer download-rate EWMA, maintained actor-side and fed by block
//! receipts (`process_block_completion`) + the 1 s `pipeline_tick`.
//!
//! History: M104 removed the peer-task AIMD depth system; M257f removed
//! the actor-side RTT `request_times` map that M104 had orphaned (the
//! send-side registration was deleted with the peer-task state, so the
//! map stayed permanently empty — `avg_rtt` was never populated between
//! M104 and M257f). RTT is now measured where both directions are
//! visible: the peer task pairs Request sends with Piece receipts via
//! `PeerShared::request_sent_times` and ships the sample to the actor
//! on each `BlockEntry`.
/// Per-peer download throughput EWMA.
pub(crate) struct PeerPipelineState {
/// Bytes received in the current tick window (for throughput stats).
last_second_bytes: u64,
/// Bytes of the last COMPLETED window, stashed by `tick()`. The BDP
/// depth cap reads this instead of the EWMA: a raw 1 s window reads
/// the depth-limited delivery rate exactly from the first tick,
/// where the EWMA's 1−0.7^k convergence staircase spends ~9 ticks
/// below it — every tick of that staircase is a shrink/grow
/// knife-edge (M257f evidence run 3: bimodal high-BDP outcomes from
/// RTT-jitter flipping the tick-3 boundary). Window noise is what
/// the cap's 3-confirm shrink hysteresis is for.
last_window_bytes: u64,
/// EWMA throughput estimate in bytes/sec.
ewma_rate_bytes_sec: f64,
}
impl PeerPipelineState {
/// Create a new pipeline state with EWMA throughput tracking.
pub fn new() -> Self {
Self {
last_second_bytes: 0,
last_window_bytes: 0,
ewma_rate_bytes_sec: 0.0,
}
}
/// Current EWMA throughput estimate in bytes/sec (for stats/reporting).
pub fn ewma_rate(&self) -> f64 {
self.ewma_rate_bytes_sec
}
/// Bytes delivered in the last completed 1 s window — a raw,
/// convergence-free rate in B/s (the BDP cap's input; see the
/// `last_window_bytes` field doc).
pub fn last_window_rate(&self) -> u64 {
self.last_window_bytes
}
/// Test-only: pin the EWMA to an exact rate (integration tests
/// assert exact quota-share arithmetic).
#[cfg(test)]
pub(crate) fn set_ewma_for_test(&mut self, rate: f64) {
self.ewma_rate_bytes_sec = rate;
}
/// Test-only: pin the completed-window rate (integration tests
/// assert exact BDP-cap arithmetic).
#[cfg(test)]
pub(crate) fn set_window_for_test(&mut self, bytes: u64) {
self.last_window_bytes = bytes;
}
/// Record received block bytes for the EWMA window.
pub fn block_received(&mut self, length: u32) {
self.last_second_bytes += u64::from(length);
}
/// Called periodically to update the EWMA throughput estimate.
pub fn tick(&mut self) {
const ALPHA: f64 = 0.3;
self.last_window_bytes = self.last_second_bytes;
self.ewma_rate_bytes_sec =
ALPHA * self.last_second_bytes as f64 + (1.0 - ALPHA) * self.ewma_rate_bytes_sec;
self.last_second_bytes = 0;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn last_window_rate_stashes_completed_window() {
let mut p = PeerPipelineState::new();
assert_eq!(p.last_window_rate(), 0, "no window completed yet");
p.block_received(16384);
p.block_received(16384);
p.tick();
assert_eq!(p.last_window_rate(), 32768, "completed window stashed");
p.tick();
assert_eq!(p.last_window_rate(), 0, "empty window replaces stash");
}
#[test]
fn pipeline_tick_no_depth_adjustment() {
// After AIMD removal, tick() only updates EWMA, no depth changes
let mut p = PeerPipelineState::new();
// Simulate receiving some data
p.block_received(16384);
p.block_received(16384);
p.tick();
// EWMA should be non-zero after receiving data
assert!(p.ewma_rate() > 0.0);
// Tick again with no data — EWMA decays
p.tick();
let rate_after_decay = p.ewma_rate();
assert!(rate_after_decay < p.ewma_rate_bytes_sec + 1.0); // decaying
}
#[test]
#[allow(clippy::float_cmp, reason = "exact sentinel value comparison")]
fn pipeline_ewma_still_tracks() {
let mut p = PeerPipelineState::new();
assert_eq!(p.ewma_rate(), 0.0);
// Feed data and tick
p.block_received(100_000);
p.tick();
let rate1 = p.ewma_rate();
assert!(rate1 > 0.0, "EWMA should be positive after receiving data");
// Feed more data and tick
p.block_received(100_000);
p.tick();
let rate2 = p.ewma_rate();
assert!(
rate2 > rate1 * 0.5,
"EWMA should maintain with steady traffic"
);
// No data, tick — should decay
p.tick();
let rate3 = p.ewma_rate();
assert!(rate3 < rate2, "EWMA should decay without data");
}
}