freenet 0.2.82

Freenet core software
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
use crate::message::Transaction;
use dashmap::{DashMap, DashSet};
use std::net::SocketAddr;
use std::sync::Arc;

/// Tracks live transactions per peer address.
///
/// Uses `SocketAddr` as the key since transactions are tied to network connections,
/// not cryptographic identities.
///
/// Maintains a reverse index (tx -> peer) for O(1) transaction removal instead of
/// O(n) full-map iteration. This significantly reduces lock contention under load.
#[derive(Clone)]
pub struct LiveTransactionTracker {
    tx_per_peer: Arc<DashMap<SocketAddr, Vec<Transaction>>>,
    /// Reverse index: Transaction -> SocketAddr for O(1) lookup during removal.
    /// Without this, remove_finished_transaction would need to iterate all peers.
    peer_for_tx: Arc<DashMap<Transaction, SocketAddr>>,
    /// CONNECT transactions this node *initiated itself* via `Ring::acquire_new`
    /// (i.e. genuine connection-acquisition attempts), as opposed to CONNECTs it
    /// is merely relaying for other peers.
    ///
    /// `connection_maintenance` throttles concurrent acquisition attempts on
    /// this count. It must NOT include relayed CONNECTs: a relay holds a
    /// `tx_per_peer` entry for each CONNECT it forwards (registered in
    /// `p2p_protoc`) until the op completes or hits the 60s TTL, so counting
    /// those would let a relay-heavy node's own acquisition budget be consumed
    /// by other peers' traffic — once at/above `min_connections` the budget is
    /// only `BASE = 3`, so a few relayed CONNECTs would stall the node's own
    /// growth flat just above `min_connections` (#4348). An entry is cleared
    /// only by `remove_finished_transaction` (NOT by `prune_transactions_from_peer`):
    /// an acquisition stays "in flight" across peer churn until its driver
    /// actually finishes. The acquisition driver always reaches
    /// `remove_finished_transaction` — on completion, on an orphan wake
    /// (`handle_orphaned_transactions` after a peer disconnect), or via the
    /// driver's 60s `OPERATION_TTL` exit (`should_exit_for_ttl` →
    /// `release_pending_op_slot` → `TransactionCompleted`) — so the set cannot
    /// leak, and tying cleanup to completion alone keeps the gauge race-free
    /// under concurrent rebinds.
    acquisition_txs: Arc<DashSet<Transaction>>,
}

impl LiveTransactionTracker {
    pub fn add_transaction(&self, peer_addr: SocketAddr, tx: Transaction) {
        // Insert to reverse index first to prevent race condition:
        // If remove_finished_transaction runs concurrently, it will find the tx
        // in peer_for_tx and clean up properly. If we did tx_per_peer first,
        // a concurrent remove could miss the tx in peer_for_tx and leave orphans.
        //
        // NOTE: this is the pre-#4154 behavior. Re-registering the same
        // `tx` against multiple peers leaves stale entries in the older
        // peers' `tx_per_peer` Vec. Rebind-safe semantics were attempted
        // in PR #4164 but interacted badly with topology maintenance:
        // the "inflated" Vec entries acted as a soft signal that masked
        // peers from neighbor consideration, and tightening that
        // semantic caused `test_six_peer_contract_lifecycle` to diverge
        // (CRDT broadcast missed one peer). Until topology maintenance
        // is decoupled from `has_live_connection`, keep the loose
        // semantics here.
        self.peer_for_tx.insert(tx, peer_addr);
        self.tx_per_peer.entry(peer_addr).or_default().push(tx);
    }

    pub fn remove_finished_transaction(&self, tx: Transaction) {
        // Clear the acquisition gauge in lockstep with the live-tx removal so a
        // completed acquisition frees its maintenance-throttle slot. No-op for
        // relayed CONNECTs (never registered as acquisitions).
        self.acquisition_txs.remove(&tx);
        // O(1) lookup using reverse index instead of O(n) full-map iteration
        if let Some((_, peer_addr)) = self.peer_for_tx.remove(&tx) {
            self.tx_per_peer.remove_if_mut(&peer_addr, |_, v| {
                v.retain(|otx| otx != &tx);
                v.is_empty()
            });
        }
    }

    /// Mark `tx` as a self-initiated connection-acquisition attempt so it counts
    /// toward the `connection_maintenance` concurrency throttle. Call this only
    /// from `Ring::acquire_new`; relayed CONNECTs must NOT be registered here.
    pub(crate) fn register_acquisition(&self, tx: Transaction) {
        self.acquisition_txs.insert(tx);
    }

    pub(crate) fn new() -> Self {
        Self {
            tx_per_peer: Arc::new(DashMap::default()),
            peer_for_tx: Arc::new(DashMap::default()),
            acquisition_txs: Arc::new(DashSet::default()),
        }
    }

    /// Prune all transactions associated with a peer and return them.
    ///
    /// Returns the list of transactions that were associated with this peer,
    /// allowing callers to handle them appropriately (e.g., retry via alternate routes).
    pub(crate) fn prune_transactions_from_peer(&self, peer_addr: SocketAddr) -> Vec<Transaction> {
        // Remove all transactions for this peer from the reverse index.
        //
        // NOTE: this deliberately does NOT touch `acquisition_txs`. A pruned
        // peer means a disconnect, not acquisition completion — the orphaned
        // acquisition's driver is woken (`handle_orphaned_transactions`) and
        // retries or fails, and either way reaches `remove_finished_transaction`
        // (or the 60s TTL backstop), which clears the gauge. Removing here would
        // (a) undercount a tx that was rebound to a still-live peer, and
        // (b) race a concurrent `add_transaction` rebind between the owner check
        // and the remove. Leaving acquisition cleanup solely to
        // `remove_finished_transaction` is race-free and, during the brief
        // orphan window, errs toward over-counting — the safe direction for a
        // concurrency throttle (#4348 review).
        if let Some((_, txs)) = self.tx_per_peer.remove(&peer_addr) {
            for tx in &txs {
                self.peer_for_tx.remove(tx);
            }
            txs
        } else {
            Vec::new()
        }
    }

    pub(crate) fn has_live_connection(&self, peer_addr: SocketAddr) -> bool {
        self.tx_per_peer.contains_key(&peer_addr)
    }

    pub(crate) fn len(&self) -> usize {
        self.tx_per_peer.len()
    }

    /// Returns the total number of active transactions across all peers.
    #[cfg(test)]
    pub(crate) fn active_transaction_count(&self) -> usize {
        self.tx_per_peer
            .iter()
            .map(|entry| entry.value().len())
            .sum()
    }

    /// Returns the number of in-flight connection-acquisition attempts this node
    /// initiated itself (via `Ring::acquire_new`). `connection_maintenance` uses
    /// this to throttle concurrent acquisitions.
    ///
    /// This deliberately excludes CONNECTs the node is relaying for other peers
    /// (those are tracked in `tx_per_peer` for cancellation/`has_live_connection`
    /// but never registered as acquisitions), so relay load cannot consume the
    /// node's own acquisition budget — see the `acquisition_txs` field (#4348).
    pub(crate) fn active_acquisition_transaction_count(&self) -> usize {
        self.acquisition_txs.len()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::operations::connect::ConnectMsg;
    use crate::operations::get::GetMsg;
    use crate::operations::put::PutMsg;

    #[test]
    fn active_transaction_count_empty() {
        let tracker = LiveTransactionTracker::new();
        assert_eq!(tracker.active_transaction_count(), 0);
    }

    #[test]
    fn active_transaction_count_single_peer() {
        let tracker = LiveTransactionTracker::new();
        let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();

        tracker.add_transaction(addr, Transaction::new::<ConnectMsg>());
        assert_eq!(tracker.active_transaction_count(), 1);

        tracker.add_transaction(addr, Transaction::new::<ConnectMsg>());
        assert_eq!(tracker.active_transaction_count(), 2);
    }

    #[test]
    fn active_transaction_count_multiple_peers() {
        let tracker = LiveTransactionTracker::new();
        let addr1: SocketAddr = "127.0.0.1:8080".parse().unwrap();
        let addr2: SocketAddr = "127.0.0.1:8081".parse().unwrap();

        tracker.add_transaction(addr1, Transaction::new::<ConnectMsg>());
        tracker.add_transaction(addr1, Transaction::new::<ConnectMsg>());
        tracker.add_transaction(addr2, Transaction::new::<ConnectMsg>());

        assert_eq!(tracker.active_transaction_count(), 3);
    }

    #[test]
    fn active_transaction_count_after_removal() {
        let tracker = LiveTransactionTracker::new();
        let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();

        let tx1 = Transaction::new::<ConnectMsg>();
        let tx2 = Transaction::new::<ConnectMsg>();

        tracker.add_transaction(addr, tx1);
        tracker.add_transaction(addr, tx2);
        assert_eq!(tracker.active_transaction_count(), 2);

        tracker.remove_finished_transaction(tx1);
        assert_eq!(tracker.active_transaction_count(), 1);

        tracker.remove_finished_transaction(tx2);
        assert_eq!(tracker.active_transaction_count(), 0);
    }

    #[test]
    fn acquisition_count_empty() {
        let tracker = LiveTransactionTracker::new();
        assert_eq!(tracker.active_acquisition_transaction_count(), 0);
    }

    /// Regression test for the residual half of #4348: the
    /// `connection_maintenance` acquisition throttle must count ONLY CONNECTs
    /// this node initiated itself, not CONNECTs it is relaying for other peers.
    /// Relayed CONNECTs land in `tx_per_peer` (via `add_transaction`) and are
    /// held until the op completes or hits the 60s TTL; counting them let a
    /// relay-heavy node's own acquisition budget (only `BASE = 3` once at/above
    /// `min_connections`) be consumed by other peers' traffic, stalling its
    /// growth flat just above `min_connections`.
    #[test]
    fn acquisition_count_excludes_relayed_connects() {
        let tracker = LiveTransactionTracker::new();
        let peer1: SocketAddr = "127.0.0.1:8080".parse().unwrap();
        let peer2: SocketAddr = "127.0.0.1:8081".parse().unwrap();

        // Three CONNECTs this node is merely relaying for others.
        tracker.add_transaction(peer1, Transaction::new::<ConnectMsg>());
        tracker.add_transaction(peer1, Transaction::new::<ConnectMsg>());
        tracker.add_transaction(peer2, Transaction::new::<ConnectMsg>());

        // None of them are this node's own acquisition attempts.
        assert_eq!(
            tracker.active_acquisition_transaction_count(),
            0,
            "relayed CONNECTs must not consume the acquisition throttle budget"
        );

        // One CONNECT this node initiated itself via acquire_new.
        let own = Transaction::new::<ConnectMsg>();
        tracker.add_transaction(peer1, own);
        tracker.register_acquisition(own);
        assert_eq!(tracker.active_acquisition_transaction_count(), 1);
    }

    /// A self-initiated CONNECT rebound across hops (acquire_new registers once,
    /// then the tx is forwarded to successive peers via `add_transaction`) is
    /// still a single in-flight acquisition.
    #[test]
    fn acquisition_count_unaffected_by_cross_peer_rebind() {
        let tracker = LiveTransactionTracker::new();
        let addr1: SocketAddr = "127.0.0.1:9001".parse().unwrap();
        let addr2: SocketAddr = "127.0.0.1:9002".parse().unwrap();
        let tx = Transaction::new::<ConnectMsg>();

        tracker.register_acquisition(tx);
        tracker.add_transaction(addr1, tx);
        tracker.add_transaction(addr2, tx); // rebind to a second hop

        assert_eq!(tracker.active_acquisition_transaction_count(), 1);
    }

    /// The acquisition gauge drains on transaction completion (and only then),
    /// so a completed acquisition frees its throttle slot and the set cannot
    /// leak and pin acquisition at the cap.
    #[test]
    fn acquisition_count_drains_on_completion() {
        let tracker = LiveTransactionTracker::new();
        let addr: SocketAddr = "127.0.0.1:9001".parse().unwrap();

        let first = Transaction::new::<ConnectMsg>();
        tracker.add_transaction(addr, first);
        tracker.register_acquisition(first);
        let second = Transaction::new::<ConnectMsg>();
        tracker.add_transaction(addr, second);
        tracker.register_acquisition(second);
        assert_eq!(tracker.active_acquisition_transaction_count(), 2);

        tracker.remove_finished_transaction(first);
        assert_eq!(tracker.active_acquisition_transaction_count(), 1);

        tracker.remove_finished_transaction(second);
        assert_eq!(tracker.active_acquisition_transaction_count(), 0);
    }

    /// Pruning a peer (a disconnect, NOT completion) must NOT drain the
    /// acquisition gauge: the orphaned acquisition is still in flight (its
    /// driver retries or fails and reaches `remove_finished_transaction`). This
    /// holds even for a tx cross-peer-rebound to a still-live peer, where a
    /// remove-on-prune would undercount and let the maintenance throttle exceed
    /// max_concurrent during churn (#4348 review). The gauge clears only on
    /// completion.
    #[test]
    fn acquisition_count_unaffected_by_prune() {
        let tracker = LiveTransactionTracker::new();
        let old_peer: SocketAddr = "127.0.0.1:9001".parse().unwrap();
        let new_peer: SocketAddr = "127.0.0.1:9002".parse().unwrap();
        let tx = Transaction::new::<ConnectMsg>();

        tracker.register_acquisition(tx);
        tracker.add_transaction(old_peer, tx);
        tracker.add_transaction(new_peer, tx); // rebind; old_peer entry now stale
        assert_eq!(tracker.active_acquisition_transaction_count(), 1);

        // old_peer disconnects — its stale entry is pruned, but the tx is still
        // in flight (live on new_peer), so the acquisition gauge must stay at 1.
        tracker.prune_transactions_from_peer(old_peer);
        assert_eq!(
            tracker.active_acquisition_transaction_count(),
            1,
            "prune must not drain an in-flight acquisition"
        );
        assert!(tracker.has_live_connection(new_peer));

        // Completion finally drains it.
        tracker.remove_finished_transaction(tx);
        assert_eq!(tracker.active_acquisition_transaction_count(), 0);
    }

    #[test]
    fn prune_transactions_from_peer_cleans_both_indices() {
        let tracker = LiveTransactionTracker::new();
        let addr1: SocketAddr = "127.0.0.1:8080".parse().unwrap();
        let addr2: SocketAddr = "127.0.0.1:8081".parse().unwrap();

        let tx1 = Transaction::new::<ConnectMsg>();
        let tx2 = Transaction::new::<GetMsg>();
        let tx3 = Transaction::new::<PutMsg>();

        // Add transactions for two peers
        tracker.add_transaction(addr1, tx1);
        tracker.add_transaction(addr1, tx2);
        tracker.add_transaction(addr2, tx3);

        assert_eq!(tracker.active_transaction_count(), 3);
        assert_eq!(tracker.peer_for_tx.len(), 3);

        // Prune peer1
        tracker.prune_transactions_from_peer(addr1);

        // peer1's transactions should be gone from both indices
        assert_eq!(tracker.active_transaction_count(), 1);
        assert_eq!(tracker.peer_for_tx.len(), 1);
        assert!(!tracker.peer_for_tx.contains_key(&tx1));
        assert!(!tracker.peer_for_tx.contains_key(&tx2));
        assert!(tracker.peer_for_tx.contains_key(&tx3));

        // peer2's transaction should still exist
        assert!(tracker.has_live_connection(addr2));
        assert!(!tracker.has_live_connection(addr1));
    }

    #[test]
    fn prune_transactions_from_peer_returns_transactions() {
        let tracker = LiveTransactionTracker::new();
        let addr1: SocketAddr = "127.0.0.1:8080".parse().unwrap();
        let addr2: SocketAddr = "127.0.0.1:8081".parse().unwrap();

        let tx1 = Transaction::new::<ConnectMsg>();
        let tx2 = Transaction::new::<GetMsg>();
        let tx3 = Transaction::new::<PutMsg>();

        // Add transactions for two peers
        tracker.add_transaction(addr1, tx1);
        tracker.add_transaction(addr1, tx2);
        tracker.add_transaction(addr2, tx3);

        // Prune peer1 and check returned transactions
        let pruned = tracker.prune_transactions_from_peer(addr1);
        assert_eq!(pruned.len(), 2);
        assert!(pruned.contains(&tx1));
        assert!(pruned.contains(&tx2));

        // Prune peer2 and check returned transaction
        let pruned = tracker.prune_transactions_from_peer(addr2);
        assert_eq!(pruned.len(), 1);
        assert!(pruned.contains(&tx3));

        // Prune nonexistent peer returns empty
        let pruned = tracker.prune_transactions_from_peer(addr1);
        assert!(pruned.is_empty());
    }

    /// Pins the load-bearing invariant for `Ring::connection_maintenance`'s
    /// `has_live_connection` neighbor filter (`crates/core/src/ring.rs:3110`):
    /// once a `tx` has been
    /// registered against a peer, `has_live_connection(that_peer)` must
    /// remain `true` until the tx is explicitly cleared, even after the
    /// same `tx` is re-registered against another peer.
    ///
    /// Tightening this (`add_transaction` made rebind-safe in an earlier
    /// commit of #4164) caused `test_six_peer_contract_lifecycle` to
    /// diverge: topology saw a wider candidate-neighbor set and made a
    /// different RemoveConnections decision, breaking a CRDT broadcast
    /// chain. See the topology-coupling note in
    /// `LiveTransactionTracker::add_transaction`'s rustdoc.
    #[test]
    fn add_transaction_rebind_preserves_old_peer_has_live_connection() {
        let tracker = LiveTransactionTracker::new();
        let addr1: SocketAddr = "127.0.0.1:9001".parse().unwrap();
        let addr2: SocketAddr = "127.0.0.1:9002".parse().unwrap();
        let tx = Transaction::new::<GetMsg>();

        tracker.add_transaction(addr1, tx);
        tracker.add_transaction(addr2, tx);

        assert!(
            tracker.has_live_connection(addr1),
            "old peer must retain has_live_connection=true after rebind \
             (topology-coupling invariant for Ring::connection_maintenance)"
        );
        assert!(tracker.has_live_connection(addr2));
    }
}