tor-memquota 0.44.0

Memory use tracking and quota utilities, used by Tor software
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
//! Reclamation algorithm
//!
//! Implementation the of long-running [`task`] function,
//! (which is the only export from here, the wider `mtracker` module).

use super::*;

mod deferred_drop;

use deferred_drop::{DeferredDrop, GuardWithDeferredDrop};

/// Total number of participants
///
/// Used in reporting and in calculations of various edge cases.
/// On 64-bit systems, bigger than the refcounts, which are all `u32`
type NumParticips = usize;

//========== candidate victim analysis ==========

/// The nominal data age of a participant
#[derive(Ord, PartialOrd, Eq, PartialEq)]
enum Age {
    /// Treat this participant as having very old data
    TreatAsVeryOld,
    /// Data age value from the [`IsParticipant`]
    Actual(CoarseInstant),
}

/// Participant status, as a candidate victim
enum PStatus {
    /// Treat participant as having data of age OldestData
    Candidate(Age),
    /// Tear this participant down right away
    TearDown,
    /// Treat participant as not having any data; don't reclaim from it
    NoData,
}

/// Outcome of a completed reclamation run
///
/// This is used only within `choose_victim`, and only for logging
#[derive(Debug, derive_more::Display)]
enum Outcome {
    /// We reached the low water mark
    #[display("complete")]
    TargetReached,

    /// We didn't, but we have so many participants that that's possibly expected
    ///
    /// (Can only happen on 32-bit platforms.)
    #[display("{} participants, good enough - stopping", n_particips)]
    GoodEnough {
        /// The number of participants
        n_particips: NumParticips,
    },
}

/// Figure out whether a participant is a candidate victim, and obtain its data age
fn analyse_particip(precord: &PRecord, defer_drop: &mut DeferredDrop) -> PStatus {
    let Some(particip) = precord.particip.upgrade() else {
        // Oh!  This participant has vanished!
        // We can't reclaim from it.  It may already be reclaiming.
        // Delete it from our data structure.
        return PStatus::TearDown;
    };

    let got_oldest = catch_unwind(AssertUnwindSafe(|| particip.get_oldest(precord.enabled)));
    defer_drop.push(particip);

    match got_oldest {
        Ok(Some(age)) => return PStatus::Candidate(Age::Actual(age)),
        Ok(None) => {}
        Err(_panicked) => {
            // _panicked is of a useless type
            error!("bug in memory tracker: call to get_oldest panicked!");
            return PStatus::TearDown;
        }
    }

    // The participant claims not to have any memory
    // There might be some cached, let's check

    let Some(max_cached) = precord
        .refcount
        .as_usize()
        .checked_mul(MAX_CACHE.as_usize())
    else {
        // WTF!  So many Participation clones that the max usage has
        // overflowed.  (This can only happen on 32-bit platforms
        // since refcount is a u32.)  Probably we should reclaim
        // from this participant.
        log_ratelim!(
            "memtrack: participant with many clones claims to have no data";
            Err::<Void, _>(internal!("{} Participation clones", *precord.refcount));
        );
        return PStatus::Candidate(Age::TreatAsVeryOld);
    };

    if precord.used.as_raw() > Qty(max_cached) {
        // This participant is lying to us somehow.
        log_ratelim!(
            "memtrack: participant claims to have no data, but our accounting disagrees";
            Err::<Void, _>(internal!("{} used (by {} clones)", precord.used, *precord.refcount));
        );
        return PStatus::Candidate(Age::TreatAsVeryOld);
    }

    // Participant plausibly does have no data
    PStatus::NoData
}

//========== reclamation algorithm, the main pieces ==========

/// State while reclamation is active
struct Reclaiming {
    /// The heap of candidates, oldest at top of heap
    heap: BinaryHeap<Reverse<(Age, AId)>>,

    /// Make this type uninhabited if memory tracking is compiled out
    enabled: EnabledToken,
}

/// A victim we have selected for reclamation
///
/// This designates a specific Participant.
///
/// But, note that we always reclaim from an Account, so if we are reclaiming
/// from one `Victim`, we may be reclaiming from other `Victim`s with the same
/// `AId` and different `IsParticipant`s.  And because of inheritance, we might
/// be reclaiming from other Accounts too.
type Victim = (AId, drop_reentrancy::ProtectedArc<dyn IsParticipant>);

/// Marker indicating that the victim's reclaim function panicked
struct VictimPanicked;

/// Set of responses from the victims, after they have all finished reclaiming.
type VictimResponses = Vec<(AId, Result<Reclaimed, VictimPanicked>)>;

impl Reclaiming {
    /// Check to see if we should start reclaiming, and if so return a `Reclaiming`
    ///
    ///  1. Checks to see if usage is above `max`; if not, returns `None`
    ///  2. Logs that we're starting reclamation
    ///  3. Calculates the heap of data ages
    fn maybe_start(state: &mut GuardWithDeferredDrop) -> Option<Self> {
        let (state, deferred_drop) = state.deref_mut_both();

        if *state.total_used <= state.global.config.max {
            return None;
        }

        info!(
            "memory tracking: {} > {}, reclamation started (target {})",
            *state.total_used, state.config.max, state.config.low_water,
        );

        // `BinaryHeap` is a max heap, so use Rev
        let mut heap = BinaryHeap::new();

        // Build heap of participants we might want to reclaim from
        // (and, while we're at it, tear down broken participants)
        for (aid, arecord) in state.accounts.iter_mut() {
            arecord.ps.retain(|_pid, precord| {
                match analyse_particip(precord, deferred_drop) {
                    PStatus::Candidate(age) => {
                        heap.push(Reverse((age, aid)));
                        true // retain
                    }
                    PStatus::NoData => {
                        true // retain
                    }
                    PStatus::TearDown => {
                        precord.auto_release(&mut state.global);
                        false // remove
                    }
                }
            });
        }

        Some(Reclaiming {
            heap,
            enabled: state.enabled,
        })
    }

    /// If we're reclaiming, choose the next victim(s) to reclaim
    ///
    /// This is the account whose participant has the oldest data age,
    /// and all of that account's children.
    ///
    /// We might discover that we didn't want to continue reclamation after all:
    /// this function is responsible for checking our progress
    /// against the low water mark.
    ///
    /// If reclamation should stop, this function logs, and returns `None`.
    fn choose_victims(&mut self, state: &mut State) -> Result<Option<Vec<Victim>>, ReclaimCrashed> {
        let stop = |state: &mut State, outcome| {
            info!(
                "memory tracking reclamation reached: {} (target {}): {}",
                *state.total_used, state.config.low_water, outcome,
            );
            Ok(None)
        };

        if *state.total_used <= state.config.low_water {
            return stop(state, Outcome::TargetReached);
        }
        let Some(Reverse((_, oldest_aid))) = self.heap.pop() else {
            // All our remaining participants are NoData.
            let n_particips: usize = state
                .accounts
                .values()
                .map(|ar| {
                    ar.ps
                        .values()
                        .map(
                            |pr| *pr.refcount as NumParticips, // refcount is u32, so this is fine
                        )
                        .sum::<NumParticips>()
                })
                .sum::<NumParticips>();

            if state
                .total_used
                .as_raw()
                .as_usize()
                .checked_div(n_particips)
                .is_some_and(|total_used| total_used < usize::from(MAX_CACHE))
            {
                // On 32-bit, this could happen due to the cache, if we have
                // 2^32 / MAX_CACHE participants.
                return stop(state, Outcome::GoodEnough { n_particips });
            }

            // Oh dear.
            return Err(internal!(
                "memory accounting state corrupted: used={} n_particips={} all NoData",
                *state.total_used,
                n_particips,
            )
            .into());
        };

        // When we do partial reclamation, rather than just Collapsing:
        //
        // fudge next_oldest by something to do with number of loop iterations,
        // to avoid one-allocation-each-time ping pong between multiple caches
        //
        // (this match statement will fail to compile when we add a non-Collapsing variant)
        //
        // let next_oldest = heap.peek_lowest();
        match None {
            None | Some(Reclaimed::Collapsing) => {}
        }

        let victim_aids = state.get_aid_and_children_recursively(oldest_aid);

        let victims: Vec<Victim> = {
            let mut particips = vec![];
            for aid in victim_aids {
                let Some(arecord) = state.accounts.get_mut(aid) else {
                    // shouldn't happen but no need to panic
                    continue;
                };
                arecord.ps.retain(|_pid, precord| {
                    let Some(particip) = precord.particip.upgrade() else {
                        // tear this down!
                        precord.auto_release(&mut state.global);
                        return false;
                    };
                    particips.push((aid, particip));
                    true
                });
            }
            particips
        };

        Ok(Some(victims))
    }

    /// Notify the chosen victims and obtain their responses
    ///
    /// This is the async part, and is done with the state unlocked.
    // Doesn't actually need `self`, only `victims`, but we take it for form's sake
    async fn notify_victims(&mut self, victims: Vec<Victim>) -> VictimResponses {
        let enabled = self.enabled;

        futures::future::join_all(
            //
            victims.into_iter().map(|(aid, particip)| async move {
                let particip = particip.promise_dropping_is_ok();
                // We run the `.reclaim()` calls within the same task (since that's what
                // `join_all` does).  So they all run on whatever executor thread is polling
                // the reclamation task.
                let reclaimed = AssertUnwindSafe(particip.reclaim(enabled))
                    .catch_unwind()
                    .await
                    .map_err(|_panicked| VictimPanicked);
                // We drop the `ProtectedArc<dyn IsParticipant>` here, which is OK
                // because we don't hold the lock.  Since drop isn't async, and
                // `join_all` doesn't spawn tasks, we drop them sequentially.
                (aid, reclaimed)
            }),
        )
        .await
    }

    /// Process the victim's responses and update `state` accordingly
    // Doesn't actually need `self`, only `state`, but we take it for form's sake
    fn handle_victim_responses(&mut self, state: &mut State, responses: VictimResponses) {
        for (aid, reclaimed) in responses {
            match reclaimed {
                Ok(Reclaimed::Collapsing) | Err(VictimPanicked) => {
                    let Some(mut arecord) = state.accounts.remove(aid) else {
                        // Account is gone, fair enough
                        continue;
                    };
                    arecord.auto_release(&mut state.global);
                    // Account is definitely gone now
                }
            }
        }
    }
}

//========== the reclamation task, in terms of the pieces ==========-

/// Return value from the task, when it finishes due to the tracker being shut down
struct TaskFinished;

/// Reclaim memory until we reach low water, if necessary
///
/// Looks to see if we're above `config.max`.
/// If so, constructs a list of victims, and starts reclaiming from them,
/// until we reach low water.
async fn inner_loop(
    tracker: &Arc<MemoryQuotaTracker>,
    _enabled: EnabledToken,
) -> Result<(), ReclaimCrashed> {
    let mut reclaiming;
    let mut victims;
    {
        let mut state_guard = GuardWithDeferredDrop::new(tracker.lock()?.enabled_or_bug()?);

        let Some(r) = Reclaiming::maybe_start(&mut state_guard) else {
            return Ok(());
        };
        reclaiming = r;

        // Duplicating this call to reclaiming.choose_victims means we don't
        // release the lock between `maybe_start` and `choose_victims` (here)
        // and between `handle_victim_responses` and `choose_victims` (bellw).
        // (Releasing the lock would not be a bug, but it's not desirable.)
        let Some(v) = reclaiming.choose_victims(&mut state_guard)? else {
            return Ok(());
        };
        victims = v;
    }

    loop {
        let responses = reclaiming.notify_victims(mem::take(&mut victims)).await;
        let mut state_guard = tracker.lock()?.enabled_or_bug()?;
        reclaiming.handle_victim_responses(&mut state_guard, responses);
        let Some(v) = reclaiming.choose_victims(&mut state_guard)? else {
            return Ok(());
        };
        victims = v;
    }
}

/// Internal long-running task, handling reclamation - main loop
///
/// Handles routine logging, but not termination
async fn task_loop(
    tracker: &Weak<MemoryQuotaTracker>,
    mut wakeup: mpsc::Receiver<()>,
    enabled: EnabledToken,
) -> Result<TaskFinished, ReclaimCrashed> {
    loop {
        // We don't hold a strong reference while we loop around, so we detect
        // last drop of an actual client handle.
        {
            let Some(tracker) = tracker.upgrade() else {
                return Ok(TaskFinished);
            };

            inner_loop(&tracker, enabled).await?;
        }

        let Some(()) = wakeup.next().await else {
            // Sender dropped
            return Ok(TaskFinished);
        };
    }
}

/// Internal long-running task, handling reclamation
///
/// This is the entrypoint used by the rest of the `tracker`.
/// It handles logging of crashes.
pub(super) async fn task(
    tracker: Weak<MemoryQuotaTracker>,
    wakeup: mpsc::Receiver<()>,
    enabled: EnabledToken,
) {
    match task_loop(&tracker, wakeup, enabled).await {
        Ok(TaskFinished) => {}
        Err(bug) => {
            let _: Option<()> = (|| {
                let tracker = tracker.upgrade()?;
                let mut state = tracker.state.as_enabled()?.lock().ok()?;
                state.total_used.set_poisoned();
                Some(())
            })();
            error_report!(bug, "memory tracker task failed");
        }
    }
}