Skip to main content

hashtree_network/
pubsub_strategy.rs

1//! Pure peer traffic and pubsub fanout scheduling strategies.
2//!
3//! This module has no transport or runtime dependencies so content-hash
4//! responses, production pubsub, and deterministic simulations can exercise the
5//! same reciprocity ranking code.
6
7use std::collections::hash_map::DefaultHasher;
8use std::hash::{Hash, Hasher};
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum PubsubSchedulingPolicy {
12    Fair,
13    Random,
14    Reciprocal,
15    ReciprocalDebt,
16    AgingReciprocal,
17}
18
19#[derive(Debug, Clone, PartialEq)]
20pub struct PubsubSchedulerConfig {
21    pub policy: PubsubSchedulingPolicy,
22    pub fanout: usize,
23    pub anonymous_free_credit_bytes: u64,
24    pub reciprocal_credit_multiplier: f64,
25    pub aging_credit_bytes: u64,
26}
27
28impl Default for PubsubSchedulerConfig {
29    fn default() -> Self {
30        Self {
31            policy: PubsubSchedulingPolicy::Reciprocal,
32            fanout: 8,
33            anonymous_free_credit_bytes: 4 * 1024,
34            reciprocal_credit_multiplier: 1.0,
35            aging_credit_bytes: 2 * 1024,
36        }
37    }
38}
39
40#[derive(Debug, Clone, Copy, Default, PartialEq)]
41pub struct PeerTrafficSnapshot {
42    pub bytes_sent: u64,
43    pub bytes_received: u64,
44    /// Bytes that matched local demand or downstream demand.
45    ///
46    /// Raw ingress can be spam. Reciprocity weighting should only reward bytes
47    /// from content we requested, pubsub streams we subscribed to, or data we
48    /// forwarded for a peer with an active downstream interest.
49    pub useful_bytes_received: u64,
50    pub bandwidth_debt: f64,
51}
52
53#[derive(Debug, Clone, PartialEq)]
54pub struct PubsubCandidate {
55    pub peer_id: String,
56    pub traffic: PeerTrafficSnapshot,
57    pub deferred_count: u64,
58}
59
60#[derive(Debug, Clone, Default, PartialEq, Eq)]
61pub struct PubsubSelection {
62    pub selected: Vec<String>,
63    pub deferred: Vec<String>,
64}
65
66#[derive(Debug, Clone, PartialEq, Eq)]
67pub struct OutboundJobCandidate {
68    pub job_id: u64,
69    pub peer_id: String,
70    pub message_bytes: u64,
71    pub queue_sequence: u64,
72}
73
74#[derive(Debug, Clone, Copy, PartialEq)]
75pub struct OutboundJobSelection {
76    pub job_id: u64,
77    pub virtual_finish: f64,
78}
79
80impl PubsubSchedulerConfig {
81    pub fn select(
82        &self,
83        stream_id: &str,
84        seq: u64,
85        parent_id: &str,
86        message_bytes: u64,
87        candidates: &[PubsubCandidate],
88    ) -> PubsubSelection {
89        let mut ranked = candidates.to_vec();
90        match self.policy {
91            PubsubSchedulingPolicy::Fair => {
92                ranked.sort_by(|left, right| {
93                    left.traffic
94                        .bytes_sent
95                        .cmp(&right.traffic.bytes_sent)
96                        .then_with(|| left.peer_id.cmp(&right.peer_id))
97                });
98            }
99            PubsubSchedulingPolicy::Random => {
100                ranked.sort_by_key(|candidate| {
101                    stable_pubsub_score(stream_id, seq, parent_id, &candidate.peer_id)
102                });
103            }
104            PubsubSchedulingPolicy::Reciprocal => {
105                ranked.sort_by(|left, right| {
106                    self.credit_score(right)
107                        .partial_cmp(&self.credit_score(left))
108                        .unwrap_or(std::cmp::Ordering::Equal)
109                        .then_with(|| left.traffic.bytes_sent.cmp(&right.traffic.bytes_sent))
110                        .then_with(|| left.peer_id.cmp(&right.peer_id))
111                });
112            }
113            PubsubSchedulingPolicy::ReciprocalDebt => {
114                ranked.sort_by(|left, right| {
115                    self.virtual_finish(left, message_bytes)
116                        .partial_cmp(&self.virtual_finish(right, message_bytes))
117                        .unwrap_or(std::cmp::Ordering::Equal)
118                        .then_with(|| left.peer_id.cmp(&right.peer_id))
119                });
120            }
121            PubsubSchedulingPolicy::AgingReciprocal => {
122                ranked.sort_by(|left, right| {
123                    self.aging_score(right)
124                        .partial_cmp(&self.aging_score(left))
125                        .unwrap_or(std::cmp::Ordering::Equal)
126                        .then_with(|| left.traffic.bytes_sent.cmp(&right.traffic.bytes_sent))
127                        .then_with(|| left.peer_id.cmp(&right.peer_id))
128                });
129            }
130        }
131
132        let capacity = self.fanout.min(ranked.len());
133        let deferred = ranked
134            .iter()
135            .skip(capacity)
136            .map(|candidate| candidate.peer_id.clone())
137            .collect();
138        let selected = ranked
139            .into_iter()
140            .take(capacity)
141            .map(|candidate| candidate.peer_id)
142            .collect();
143        PubsubSelection { selected, deferred }
144    }
145
146    fn credit_score(&self, candidate: &PubsubCandidate) -> f64 {
147        self.anonymous_free_credit_bytes as f64
148            + candidate.traffic.useful_bytes_received as f64 * self.reciprocal_credit_multiplier
149            - candidate.traffic.bytes_sent as f64
150    }
151
152    fn aging_score(&self, candidate: &PubsubCandidate) -> f64 {
153        self.credit_score(candidate)
154            + candidate.deferred_count as f64 * self.aging_credit_bytes as f64
155    }
156
157    pub fn virtual_finish(&self, candidate: &PubsubCandidate, message_bytes: u64) -> f64 {
158        reciprocal_virtual_finish(candidate.traffic, message_bytes)
159    }
160}
161
162pub fn reciprocal_upload_weight(traffic: PeerTrafficSnapshot) -> f64 {
163    let raw_ratio = (traffic.bytes_received.saturating_add(1024) as f64)
164        / (traffic.bytes_sent.saturating_add(1024) as f64);
165    let useful_ratio = (traffic.useful_bytes_received.saturating_add(1024) as f64)
166        / (traffic.bytes_sent.saturating_add(1024) as f64);
167    let raw_ratio = raw_ratio.min(useful_ratio);
168    let bounded_ratio = raw_ratio / (1.0 + raw_ratio);
169    0.5 + 1.5 * bounded_ratio
170}
171
172pub fn reciprocal_virtual_finish(traffic: PeerTrafficSnapshot, message_bytes: u64) -> f64 {
173    traffic.bandwidth_debt + message_bytes as f64 / reciprocal_upload_weight(traffic)
174}
175
176pub fn select_reciprocal_outbound_job(
177    jobs: &[OutboundJobCandidate],
178    traffic_for_peer: impl Fn(&str) -> PeerTrafficSnapshot,
179) -> Option<OutboundJobSelection> {
180    jobs.iter()
181        .map(|job| {
182            let traffic = traffic_for_peer(&job.peer_id);
183            (
184                OutboundJobSelection {
185                    job_id: job.job_id,
186                    virtual_finish: reciprocal_virtual_finish(traffic, job.message_bytes),
187                },
188                job.queue_sequence,
189                job.peer_id.as_str(),
190            )
191        })
192        .min_by(|left, right| {
193            left.0
194                .virtual_finish
195                .partial_cmp(&right.0.virtual_finish)
196                .unwrap_or(std::cmp::Ordering::Equal)
197                .then_with(|| left.1.cmp(&right.1))
198                .then_with(|| left.2.cmp(right.2))
199        })
200        .map(|choice| choice.0)
201}
202
203pub fn stable_pubsub_score(stream_id: &str, seq: u64, parent_id: &str, peer_id: &str) -> u64 {
204    let mut hasher = DefaultHasher::new();
205    stream_id.hash(&mut hasher);
206    seq.hash(&mut hasher);
207    parent_id.hash(&mut hasher);
208    peer_id.hash(&mut hasher);
209    hasher.finish()
210}
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215
216    fn candidate(
217        peer_id: &str,
218        bytes_sent: u64,
219        bytes_received: u64,
220        bandwidth_debt: f64,
221        deferred_count: u64,
222    ) -> PubsubCandidate {
223        PubsubCandidate {
224            peer_id: peer_id.to_string(),
225            traffic: PeerTrafficSnapshot {
226                bytes_sent,
227                bytes_received,
228                useful_bytes_received: bytes_received,
229                bandwidth_debt,
230            },
231            deferred_count,
232        }
233    }
234
235    #[test]
236    fn reciprocal_policy_prioritizes_peers_that_have_served_bandwidth() {
237        let config = PubsubSchedulerConfig {
238            policy: PubsubSchedulingPolicy::Reciprocal,
239            fanout: 1,
240            anonymous_free_credit_bytes: 0,
241            reciprocal_credit_multiplier: 1.0,
242            aging_credit_bytes: 0,
243        };
244        let selection = config.select(
245            "author",
246            1,
247            "parent",
248            1024,
249            &[
250                candidate("leecher", 0, 0, 0.0, 0),
251                candidate("provider", 0, 64 * 1024, 0.0, 0),
252            ],
253        );
254
255        assert_eq!(selection.selected, vec!["provider"]);
256        assert_eq!(selection.deferred, vec!["leecher"]);
257    }
258
259    #[test]
260    fn aging_reciprocal_eventually_schedules_deferred_peers() {
261        let config = PubsubSchedulerConfig {
262            policy: PubsubSchedulingPolicy::AgingReciprocal,
263            fanout: 1,
264            anonymous_free_credit_bytes: 0,
265            reciprocal_credit_multiplier: 1.0,
266            aging_credit_bytes: 16 * 1024,
267        };
268        let selection = config.select(
269            "author",
270            1,
271            "parent",
272            1024,
273            &[
274                candidate("fresh-provider", 0, 64 * 1024, 0.0, 0),
275                candidate("starved-peer", 0, 0, 0.0, 5),
276            ],
277        );
278
279        assert_eq!(selection.selected, vec!["starved-peer"]);
280        assert_eq!(selection.deferred, vec!["fresh-provider"]);
281    }
282
283    #[test]
284    fn reciprocal_debt_matches_content_hash_response_scheduler_shape() {
285        let config = PubsubSchedulerConfig {
286            policy: PubsubSchedulingPolicy::ReciprocalDebt,
287            fanout: 1,
288            anonymous_free_credit_bytes: 0,
289            reciprocal_credit_multiplier: 1.0,
290            aging_credit_bytes: 0,
291        };
292        let selection = config.select(
293            "author",
294            1,
295            "parent",
296            1024,
297            &[
298                candidate("already-served", 64 * 1024, 64 * 1024, 16_000.0, 0),
299                candidate("low-debt-provider", 0, 64 * 1024, 0.0, 0),
300            ],
301        );
302
303        assert_eq!(selection.selected, vec!["low-debt-provider"]);
304        assert_eq!(selection.deferred, vec!["already-served"]);
305    }
306
307    #[test]
308    fn reciprocal_upload_weight_prefers_peers_who_sent_us_more() {
309        let provider = reciprocal_upload_weight(PeerTrafficSnapshot {
310            bytes_sent: 1024,
311            bytes_received: 64 * 1024,
312            useful_bytes_received: 64 * 1024,
313            bandwidth_debt: 0.0,
314        });
315        let leecher = reciprocal_upload_weight(PeerTrafficSnapshot {
316            bytes_sent: 64 * 1024,
317            bytes_received: 1024,
318            useful_bytes_received: 1024,
319            bandwidth_debt: 0.0,
320        });
321        assert!(provider > leecher);
322    }
323
324    #[test]
325    fn reciprocal_upload_weight_ignores_useless_ingress_spam() {
326        let spammer = reciprocal_upload_weight(PeerTrafficSnapshot {
327            bytes_sent: 64 * 1024,
328            bytes_received: 512 * 1024,
329            useful_bytes_received: 0,
330            bandwidth_debt: 0.0,
331        });
332        let useful = reciprocal_upload_weight(PeerTrafficSnapshot {
333            bytes_sent: 64 * 1024,
334            bytes_received: 64 * 1024,
335            useful_bytes_received: 64 * 1024,
336            bandwidth_debt: 0.0,
337        });
338
339        assert!(useful > spammer);
340    }
341
342    #[test]
343    fn reciprocal_outbound_job_scheduler_uses_shared_traffic_snapshot() {
344        let jobs = vec![
345            OutboundJobCandidate {
346                job_id: 1,
347                peer_id: "spammer".to_string(),
348                message_bytes: 4096,
349                queue_sequence: 1,
350            },
351            OutboundJobCandidate {
352                job_id: 2,
353                peer_id: "useful".to_string(),
354                message_bytes: 4096,
355                queue_sequence: 2,
356            },
357        ];
358        let selected = select_reciprocal_outbound_job(&jobs, |peer_id| match peer_id {
359            "useful" => PeerTrafficSnapshot {
360                bytes_sent: 1024,
361                bytes_received: 16 * 1024,
362                useful_bytes_received: 16 * 1024,
363                bandwidth_debt: 0.0,
364            },
365            _ => PeerTrafficSnapshot {
366                bytes_sent: 1024,
367                bytes_received: 512 * 1024,
368                useful_bytes_received: 0,
369                bandwidth_debt: 0.0,
370            },
371        })
372        .expect("selected job");
373
374        assert_eq!(selected.job_id, 2);
375    }
376}