use super::*;
mod deferred_drop;
use deferred_drop::{DeferredDrop, GuardWithDeferredDrop};
type NumParticips = usize;
#[derive(Ord, PartialOrd, Eq, PartialEq)]
enum Age {
TreatAsVeryOld,
Actual(CoarseInstant),
}
enum PStatus {
Candidate(Age),
TearDown,
NoData,
}
#[derive(Debug, derive_more::Display)]
enum Outcome {
#[display("complete")]
TargetReached,
#[display("{} participants, good enough - stopping", n_particips)]
GoodEnough {
n_particips: NumParticips,
},
}
fn analyse_particip(precord: &PRecord, defer_drop: &mut DeferredDrop) -> PStatus {
let Some(particip) = precord.particip.upgrade() else {
return PStatus::TearDown;
};
let got_oldest = catch_unwind(AssertUnwindSafe(|| particip.get_oldest(precord.enabled)));
defer_drop.push(particip);
match got_oldest {
Ok(Some(age)) => return PStatus::Candidate(Age::Actual(age)),
Ok(None) => {}
Err(_panicked) => {
error!("bug in memory tracker: call to get_oldest panicked!");
return PStatus::TearDown;
}
}
let Some(max_cached) = precord
.refcount
.as_usize()
.checked_mul(MAX_CACHE.as_usize())
else {
log_ratelim!(
"memtrack: participant with many clones claims to have no data";
Err::<Void, _>(internal!("{} Participation clones", *precord.refcount));
);
return PStatus::Candidate(Age::TreatAsVeryOld);
};
if precord.used.as_raw() > Qty(max_cached) {
log_ratelim!(
"memtrack: participant claims to have no data, but our accounting disagrees";
Err::<Void, _>(internal!("{} used (by {} clones)", precord.used, *precord.refcount));
);
return PStatus::Candidate(Age::TreatAsVeryOld);
}
PStatus::NoData
}
struct Reclaiming {
heap: BinaryHeap<Reverse<(Age, AId)>>,
enabled: EnabledToken,
}
type Victim = (AId, drop_reentrancy::ProtectedArc<dyn IsParticipant>);
struct VictimPanicked;
type VictimResponses = Vec<(AId, Result<Reclaimed, VictimPanicked>)>;
impl Reclaiming {
fn maybe_start(state: &mut GuardWithDeferredDrop) -> Option<Self> {
let (state, deferred_drop) = state.deref_mut_both();
if *state.total_used <= state.global.config.max {
return None;
}
info!(
"memory tracking: {} > {}, reclamation started (target {})",
*state.total_used, state.config.max, state.config.low_water,
);
let mut heap = BinaryHeap::new();
for (aid, arecord) in state.accounts.iter_mut() {
arecord.ps.retain(|_pid, precord| {
match analyse_particip(precord, deferred_drop) {
PStatus::Candidate(age) => {
heap.push(Reverse((age, aid)));
true }
PStatus::NoData => {
true }
PStatus::TearDown => {
precord.auto_release(&mut state.global);
false }
}
});
}
Some(Reclaiming {
heap,
enabled: state.enabled,
})
}
fn choose_victims(&mut self, state: &mut State) -> Result<Option<Vec<Victim>>, ReclaimCrashed> {
let stop = |state: &mut State, outcome| {
info!(
"memory tracking reclamation reached: {} (target {}): {}",
*state.total_used, state.config.low_water, outcome,
);
Ok(None)
};
if *state.total_used <= state.config.low_water {
return stop(state, Outcome::TargetReached);
}
let Some(Reverse((_, oldest_aid))) = self.heap.pop() else {
let n_particips: usize = state
.accounts
.values()
.map(|ar| {
ar.ps
.values()
.map(
|pr| *pr.refcount as NumParticips, )
.sum::<NumParticips>()
})
.sum::<NumParticips>();
if state
.total_used
.as_raw()
.as_usize()
.checked_div(n_particips)
.is_some_and(|total_used| total_used < usize::from(MAX_CACHE))
{
return stop(state, Outcome::GoodEnough { n_particips });
}
return Err(internal!(
"memory accounting state corrupted: used={} n_particips={} all NoData",
*state.total_used,
n_particips,
)
.into());
};
match None {
None | Some(Reclaimed::Collapsing) => {}
}
let victim_aids = state.get_aid_and_children_recursively(oldest_aid);
let victims: Vec<Victim> = {
let mut particips = vec![];
for aid in victim_aids {
let Some(arecord) = state.accounts.get_mut(aid) else {
continue;
};
arecord.ps.retain(|_pid, precord| {
let Some(particip) = precord.particip.upgrade() else {
precord.auto_release(&mut state.global);
return false;
};
particips.push((aid, particip));
true
});
}
particips
};
Ok(Some(victims))
}
async fn notify_victims(&mut self, victims: Vec<Victim>) -> VictimResponses {
let enabled = self.enabled;
futures::future::join_all(
victims.into_iter().map(|(aid, particip)| async move {
let particip = particip.promise_dropping_is_ok();
let reclaimed = AssertUnwindSafe(particip.reclaim(enabled))
.catch_unwind()
.await
.map_err(|_panicked| VictimPanicked);
(aid, reclaimed)
}),
)
.await
}
fn handle_victim_responses(&mut self, state: &mut State, responses: VictimResponses) {
for (aid, reclaimed) in responses {
match reclaimed {
Ok(Reclaimed::Collapsing) | Err(VictimPanicked) => {
let Some(mut arecord) = state.accounts.remove(aid) else {
continue;
};
arecord.auto_release(&mut state.global);
}
}
}
}
}
struct TaskFinished;
async fn inner_loop(
tracker: &Arc<MemoryQuotaTracker>,
_enabled: EnabledToken,
) -> Result<(), ReclaimCrashed> {
let mut reclaiming;
let mut victims;
{
let mut state_guard = GuardWithDeferredDrop::new(tracker.lock()?.enabled_or_bug()?);
let Some(r) = Reclaiming::maybe_start(&mut state_guard) else {
return Ok(());
};
reclaiming = r;
let Some(v) = reclaiming.choose_victims(&mut state_guard)? else {
return Ok(());
};
victims = v;
}
loop {
let responses = reclaiming.notify_victims(mem::take(&mut victims)).await;
let mut state_guard = tracker.lock()?.enabled_or_bug()?;
reclaiming.handle_victim_responses(&mut state_guard, responses);
let Some(v) = reclaiming.choose_victims(&mut state_guard)? else {
return Ok(());
};
victims = v;
}
}
async fn task_loop(
tracker: &Weak<MemoryQuotaTracker>,
mut wakeup: mpsc::Receiver<()>,
enabled: EnabledToken,
) -> Result<TaskFinished, ReclaimCrashed> {
loop {
{
let Some(tracker) = tracker.upgrade() else {
return Ok(TaskFinished);
};
inner_loop(&tracker, enabled).await?;
}
let Some(()) = wakeup.next().await else {
return Ok(TaskFinished);
};
}
}
pub(super) async fn task(
tracker: Weak<MemoryQuotaTracker>,
wakeup: mpsc::Receiver<()>,
enabled: EnabledToken,
) {
match task_loop(&tracker, wakeup, enabled).await {
Ok(TaskFinished) => {}
Err(bug) => {
let _: Option<()> = (|| {
let tracker = tracker.upgrade()?;
let mut state = tracker.state.as_enabled()?.lock().ok()?;
state.total_used.set_poisoned();
Some(())
})();
error_report!(bug, "memory tracker task failed");
}
}
}