1use std::collections::hash_map::DefaultHasher;
8use std::hash::{Hash, Hasher};
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum PubsubSchedulingPolicy {
12 Fair,
13 Random,
14 Reciprocal,
15 ReciprocalDebt,
16 AgingReciprocal,
17}
18
19#[derive(Debug, Clone, PartialEq)]
20pub struct PubsubSchedulerConfig {
21 pub policy: PubsubSchedulingPolicy,
22 pub fanout: usize,
23 pub anonymous_free_credit_bytes: u64,
24 pub reciprocal_credit_multiplier: f64,
25 pub aging_credit_bytes: u64,
26}
27
28impl Default for PubsubSchedulerConfig {
29 fn default() -> Self {
30 Self {
31 policy: PubsubSchedulingPolicy::Reciprocal,
32 fanout: 8,
33 anonymous_free_credit_bytes: 4 * 1024,
34 reciprocal_credit_multiplier: 1.0,
35 aging_credit_bytes: 2 * 1024,
36 }
37 }
38}
39
40#[derive(Debug, Clone, Copy, Default, PartialEq)]
41pub struct PeerTrafficSnapshot {
42 pub bytes_sent: u64,
43 pub bytes_received: u64,
44 pub useful_bytes_received: u64,
50 pub bandwidth_debt: f64,
51}
52
53#[derive(Debug, Clone, PartialEq)]
54pub struct PubsubCandidate {
55 pub peer_id: String,
56 pub traffic: PeerTrafficSnapshot,
57 pub deferred_count: u64,
58}
59
60#[derive(Debug, Clone, Default, PartialEq, Eq)]
61pub struct PubsubSelection {
62 pub selected: Vec<String>,
63 pub deferred: Vec<String>,
64}
65
66#[derive(Debug, Clone, PartialEq, Eq)]
67pub struct OutboundJobCandidate {
68 pub job_id: u64,
69 pub peer_id: String,
70 pub message_bytes: u64,
71 pub queue_sequence: u64,
72}
73
74#[derive(Debug, Clone, Copy, PartialEq)]
75pub struct OutboundJobSelection {
76 pub job_id: u64,
77 pub virtual_finish: f64,
78}
79
80impl PubsubSchedulerConfig {
81 pub fn select(
82 &self,
83 stream_id: &str,
84 seq: u64,
85 parent_id: &str,
86 message_bytes: u64,
87 candidates: &[PubsubCandidate],
88 ) -> PubsubSelection {
89 let mut ranked = candidates.to_vec();
90 match self.policy {
91 PubsubSchedulingPolicy::Fair => {
92 ranked.sort_by(|left, right| {
93 left.traffic
94 .bytes_sent
95 .cmp(&right.traffic.bytes_sent)
96 .then_with(|| left.peer_id.cmp(&right.peer_id))
97 });
98 }
99 PubsubSchedulingPolicy::Random => {
100 ranked.sort_by_key(|candidate| {
101 stable_pubsub_score(stream_id, seq, parent_id, &candidate.peer_id)
102 });
103 }
104 PubsubSchedulingPolicy::Reciprocal => {
105 ranked.sort_by(|left, right| {
106 self.credit_score(right)
107 .partial_cmp(&self.credit_score(left))
108 .unwrap_or(std::cmp::Ordering::Equal)
109 .then_with(|| left.traffic.bytes_sent.cmp(&right.traffic.bytes_sent))
110 .then_with(|| left.peer_id.cmp(&right.peer_id))
111 });
112 }
113 PubsubSchedulingPolicy::ReciprocalDebt => {
114 ranked.sort_by(|left, right| {
115 self.virtual_finish(left, message_bytes)
116 .partial_cmp(&self.virtual_finish(right, message_bytes))
117 .unwrap_or(std::cmp::Ordering::Equal)
118 .then_with(|| left.peer_id.cmp(&right.peer_id))
119 });
120 }
121 PubsubSchedulingPolicy::AgingReciprocal => {
122 ranked.sort_by(|left, right| {
123 self.aging_score(right)
124 .partial_cmp(&self.aging_score(left))
125 .unwrap_or(std::cmp::Ordering::Equal)
126 .then_with(|| left.traffic.bytes_sent.cmp(&right.traffic.bytes_sent))
127 .then_with(|| left.peer_id.cmp(&right.peer_id))
128 });
129 }
130 }
131
132 let capacity = self.fanout.min(ranked.len());
133 let deferred = ranked
134 .iter()
135 .skip(capacity)
136 .map(|candidate| candidate.peer_id.clone())
137 .collect();
138 let selected = ranked
139 .into_iter()
140 .take(capacity)
141 .map(|candidate| candidate.peer_id)
142 .collect();
143 PubsubSelection { selected, deferred }
144 }
145
146 fn credit_score(&self, candidate: &PubsubCandidate) -> f64 {
147 self.anonymous_free_credit_bytes as f64
148 + candidate.traffic.useful_bytes_received as f64 * self.reciprocal_credit_multiplier
149 - candidate.traffic.bytes_sent as f64
150 }
151
152 fn aging_score(&self, candidate: &PubsubCandidate) -> f64 {
153 self.credit_score(candidate)
154 + candidate.deferred_count as f64 * self.aging_credit_bytes as f64
155 }
156
157 pub fn virtual_finish(&self, candidate: &PubsubCandidate, message_bytes: u64) -> f64 {
158 reciprocal_virtual_finish(candidate.traffic, message_bytes)
159 }
160}
161
162pub fn reciprocal_upload_weight(traffic: PeerTrafficSnapshot) -> f64 {
163 let raw_ratio = (traffic.bytes_received.saturating_add(1024) as f64)
164 / (traffic.bytes_sent.saturating_add(1024) as f64);
165 let useful_ratio = (traffic.useful_bytes_received.saturating_add(1024) as f64)
166 / (traffic.bytes_sent.saturating_add(1024) as f64);
167 let raw_ratio = raw_ratio.min(useful_ratio);
168 let bounded_ratio = raw_ratio / (1.0 + raw_ratio);
169 0.5 + 1.5 * bounded_ratio
170}
171
172pub fn reciprocal_virtual_finish(traffic: PeerTrafficSnapshot, message_bytes: u64) -> f64 {
173 traffic.bandwidth_debt + message_bytes as f64 / reciprocal_upload_weight(traffic)
174}
175
176pub fn select_reciprocal_outbound_job(
177 jobs: &[OutboundJobCandidate],
178 traffic_for_peer: impl Fn(&str) -> PeerTrafficSnapshot,
179) -> Option<OutboundJobSelection> {
180 jobs.iter()
181 .map(|job| {
182 let traffic = traffic_for_peer(&job.peer_id);
183 (
184 OutboundJobSelection {
185 job_id: job.job_id,
186 virtual_finish: reciprocal_virtual_finish(traffic, job.message_bytes),
187 },
188 job.queue_sequence,
189 job.peer_id.as_str(),
190 )
191 })
192 .min_by(|left, right| {
193 left.0
194 .virtual_finish
195 .partial_cmp(&right.0.virtual_finish)
196 .unwrap_or(std::cmp::Ordering::Equal)
197 .then_with(|| left.1.cmp(&right.1))
198 .then_with(|| left.2.cmp(right.2))
199 })
200 .map(|choice| choice.0)
201}
202
203pub fn stable_pubsub_score(stream_id: &str, seq: u64, parent_id: &str, peer_id: &str) -> u64 {
204 let mut hasher = DefaultHasher::new();
205 stream_id.hash(&mut hasher);
206 seq.hash(&mut hasher);
207 parent_id.hash(&mut hasher);
208 peer_id.hash(&mut hasher);
209 hasher.finish()
210}
211
212#[cfg(test)]
213mod tests {
214 use super::*;
215
216 fn candidate(
217 peer_id: &str,
218 bytes_sent: u64,
219 bytes_received: u64,
220 bandwidth_debt: f64,
221 deferred_count: u64,
222 ) -> PubsubCandidate {
223 PubsubCandidate {
224 peer_id: peer_id.to_string(),
225 traffic: PeerTrafficSnapshot {
226 bytes_sent,
227 bytes_received,
228 useful_bytes_received: bytes_received,
229 bandwidth_debt,
230 },
231 deferred_count,
232 }
233 }
234
235 #[test]
236 fn reciprocal_policy_prioritizes_peers_that_have_served_bandwidth() {
237 let config = PubsubSchedulerConfig {
238 policy: PubsubSchedulingPolicy::Reciprocal,
239 fanout: 1,
240 anonymous_free_credit_bytes: 0,
241 reciprocal_credit_multiplier: 1.0,
242 aging_credit_bytes: 0,
243 };
244 let selection = config.select(
245 "author",
246 1,
247 "parent",
248 1024,
249 &[
250 candidate("leecher", 0, 0, 0.0, 0),
251 candidate("provider", 0, 64 * 1024, 0.0, 0),
252 ],
253 );
254
255 assert_eq!(selection.selected, vec!["provider"]);
256 assert_eq!(selection.deferred, vec!["leecher"]);
257 }
258
259 #[test]
260 fn aging_reciprocal_eventually_schedules_deferred_peers() {
261 let config = PubsubSchedulerConfig {
262 policy: PubsubSchedulingPolicy::AgingReciprocal,
263 fanout: 1,
264 anonymous_free_credit_bytes: 0,
265 reciprocal_credit_multiplier: 1.0,
266 aging_credit_bytes: 16 * 1024,
267 };
268 let selection = config.select(
269 "author",
270 1,
271 "parent",
272 1024,
273 &[
274 candidate("fresh-provider", 0, 64 * 1024, 0.0, 0),
275 candidate("starved-peer", 0, 0, 0.0, 5),
276 ],
277 );
278
279 assert_eq!(selection.selected, vec!["starved-peer"]);
280 assert_eq!(selection.deferred, vec!["fresh-provider"]);
281 }
282
283 #[test]
284 fn reciprocal_debt_matches_content_hash_response_scheduler_shape() {
285 let config = PubsubSchedulerConfig {
286 policy: PubsubSchedulingPolicy::ReciprocalDebt,
287 fanout: 1,
288 anonymous_free_credit_bytes: 0,
289 reciprocal_credit_multiplier: 1.0,
290 aging_credit_bytes: 0,
291 };
292 let selection = config.select(
293 "author",
294 1,
295 "parent",
296 1024,
297 &[
298 candidate("already-served", 64 * 1024, 64 * 1024, 16_000.0, 0),
299 candidate("low-debt-provider", 0, 64 * 1024, 0.0, 0),
300 ],
301 );
302
303 assert_eq!(selection.selected, vec!["low-debt-provider"]);
304 assert_eq!(selection.deferred, vec!["already-served"]);
305 }
306
307 #[test]
308 fn reciprocal_upload_weight_prefers_peers_who_sent_us_more() {
309 let provider = reciprocal_upload_weight(PeerTrafficSnapshot {
310 bytes_sent: 1024,
311 bytes_received: 64 * 1024,
312 useful_bytes_received: 64 * 1024,
313 bandwidth_debt: 0.0,
314 });
315 let leecher = reciprocal_upload_weight(PeerTrafficSnapshot {
316 bytes_sent: 64 * 1024,
317 bytes_received: 1024,
318 useful_bytes_received: 1024,
319 bandwidth_debt: 0.0,
320 });
321 assert!(provider > leecher);
322 }
323
324 #[test]
325 fn reciprocal_upload_weight_ignores_useless_ingress_spam() {
326 let spammer = reciprocal_upload_weight(PeerTrafficSnapshot {
327 bytes_sent: 64 * 1024,
328 bytes_received: 512 * 1024,
329 useful_bytes_received: 0,
330 bandwidth_debt: 0.0,
331 });
332 let useful = reciprocal_upload_weight(PeerTrafficSnapshot {
333 bytes_sent: 64 * 1024,
334 bytes_received: 64 * 1024,
335 useful_bytes_received: 64 * 1024,
336 bandwidth_debt: 0.0,
337 });
338
339 assert!(useful > spammer);
340 }
341
342 #[test]
343 fn reciprocal_outbound_job_scheduler_uses_shared_traffic_snapshot() {
344 let jobs = vec![
345 OutboundJobCandidate {
346 job_id: 1,
347 peer_id: "spammer".to_string(),
348 message_bytes: 4096,
349 queue_sequence: 1,
350 },
351 OutboundJobCandidate {
352 job_id: 2,
353 peer_id: "useful".to_string(),
354 message_bytes: 4096,
355 queue_sequence: 2,
356 },
357 ];
358 let selected = select_reciprocal_outbound_job(&jobs, |peer_id| match peer_id {
359 "useful" => PeerTrafficSnapshot {
360 bytes_sent: 1024,
361 bytes_received: 16 * 1024,
362 useful_bytes_received: 16 * 1024,
363 bandwidth_debt: 0.0,
364 },
365 _ => PeerTrafficSnapshot {
366 bytes_sent: 1024,
367 bytes_received: 512 * 1024,
368 useful_bytes_received: 0,
369 bandwidth_debt: 0.0,
370 },
371 })
372 .expect("selected job");
373
374 assert_eq!(selected.job_id, 2);
375 }
376}