#![cfg_attr(
feature = "memquota",
doc = "let trk = MemoryQuotaTracker::new(&runtime, config).unwrap();"
)]
#![cfg_attr(
not(feature = "memquota"),
doc = "let trk = MemoryQuotaTracker::new_noop();"
)]
#![forbid(unsafe_code)]
use crate::internal_prelude::*;
use IfEnabled::*;
mod bookkeeping;
mod reclaim;
mod total_qty_notifier;
#[cfg(all(test, feature = "memquota", not(miri) /* coarsetime */))]
pub(crate) mod test;
use bookkeeping::{BookkeepableQty, ClaimedQty, ParticipQty, TotalQty};
use total_qty_notifier::TotalQtyNotifier;
pub(crate) const MAX_CACHE: Qty = Qty(16384);
const TARGET_CACHE_CLAIMING: Qty = Qty(MAX_CACHE.as_usize() * 3 / 4);
#[allow(clippy::identity_op)] const TARGET_CACHE_RELEASING: Qty = Qty(MAX_CACHE.as_usize() * 1 / 4);
#[derive(Debug)]
pub struct MemoryQuotaTracker {
state: IfEnabled<Mutex<State>>,
}
#[derive(Educe)]
#[educe(Debug)]
pub struct Account(IfEnabled<AccountInner>);
#[derive(Educe)]
#[educe(Debug)]
pub struct AccountInner {
aid: refcount::Ref<AId>,
#[educe(Debug(ignore))]
tracker: Arc<MemoryQuotaTracker>,
}
#[derive(Clone, Educe)]
#[educe(Debug)]
pub struct WeakAccount(IfEnabled<WeakAccountInner>);
#[derive(Clone, Educe)]
#[educe(Debug)]
pub struct WeakAccountInner {
aid: AId,
#[educe(Debug(ignore))]
tracker: Weak<MemoryQuotaTracker>,
}
#[derive(Debug)]
pub struct Participation(IfEnabled<ParticipationInner>);
#[derive(Debug)]
pub struct ParticipationInner {
pid: refcount::Ref<PId>,
aid: AId,
tracker: Weak<MemoryQuotaTracker>,
cache: ClaimedQty,
}
pub trait IsParticipant: Debug + Send + Sync + 'static {
fn get_oldest(&self, _: EnabledToken) -> Option<CoarseInstant>;
fn reclaim(
self: Arc<Self>,
_: EnabledToken,
) -> ReclaimFuture;
}
pub type ReclaimFuture = Pin<Box<dyn Future<Output = Reclaimed> + Send + Sync>>;
#[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug)]
#[non_exhaustive]
pub enum Reclaimed {
Collapsing,
}
slotmap_careful::new_key_type! {
struct AId;
struct PId;
}
#[derive(Debug, Deref, DerefMut)]
struct State {
#[deref]
#[deref_mut]
global: Global,
accounts: SlotMap<AId, ARecord>,
}
#[derive(Debug)]
struct Global {
total_used: TotalQtyNotifier,
config: ConfigInner,
#[allow(dead_code)]
enabled: EnabledToken,
}
#[derive(Debug)]
#[must_use = "don't just drop, call auto_release"]
struct ARecord {
refcount: refcount::Count<AId>,
children: Vec<AId>,
ps: SlotMap<PId, PRecord>,
#[allow(dead_code)]
enabled: EnabledToken,
}
#[derive(Debug)]
#[must_use = "don't just drop, call auto_release"]
struct PRecord {
refcount: refcount::Count<PId>,
used: ParticipQty,
particip: drop_reentrancy::ProtectedWeak<dyn IsParticipant>,
#[allow(dead_code)]
enabled: EnabledToken,
}
macro_rules! find_in_tracker { {
$enabled:expr;
$tracker_input:expr => $( + $tracker:ident, )? $state:ident;
$aid:expr => $arecord:ident;
$( $pid:expr => $precord:ident; )?
? $eh:tt
} => {
let tracker = &$tracker_input;
$(
let $tracker: Arc<MemoryQuotaTracker> = find_in_tracker_eh!(
$eh Error::TrackerShutdown;
tracker.upgrade()
);
let tracker = &$tracker;
)?
let _: &EnabledToken = &$enabled;
let state = find_in_tracker_eh!(
$eh Error::Bug(internal!("noop MemoryQuotaTracker found via enabled datastructure"));
tracker.state.as_enabled()
);
let mut state: MutexGuard<State> = find_in_tracker_eh!(
$eh Error::TrackerCorrupted;
state.lock().ok()
);
let $state: &mut State = &mut *state;
let aid: AId = $aid;
let $arecord: &mut ARecord = find_in_tracker_eh!(
$eh Error::AccountClosed;
$state.accounts.get_mut(aid)
);
$(
let pid: PId = $pid;
let $precord: &mut PRecord = find_in_tracker_eh!(
$eh Error::ParticipantShutdown;
$arecord.ps.get_mut(pid)
);
)?
} }
macro_rules! find_in_tracker_eh {
{ None $variant:expr; $result:expr } => { $result? };
{ Error $variant:expr; $result:expr } => { $result.ok_or_else(|| $variant)? };
}
impl MemoryQuotaTracker {
pub fn new<R: Spawn>(runtime: &R, config: Config) -> Result<Arc<Self>, StartupError> {
let Enabled(config, enabled) = config.0 else {
return Ok(MemoryQuotaTracker::new_noop());
};
let (reclaim_tx, reclaim_rx) =
mpsc_channel_no_memquota(0 );
let total_used = TotalQtyNotifier::new_zero(reclaim_tx);
let ConfigInner { max, low_water } = config;
let global = Global {
total_used,
config,
enabled,
};
let accounts = SlotMap::default();
let state = Enabled(Mutex::new(State { global, accounts }), enabled);
let tracker = Arc::new(MemoryQuotaTracker { state });
let for_task = Arc::downgrade(&tracker);
runtime.spawn(reclaim::task(for_task, reclaim_rx, enabled))?;
info!(%max, %low_water, "Memory quota tracking initialised");
Ok(tracker)
}
pub fn reconfigure(
&self,
new_config: Config,
how: tor_config::Reconfigure,
) -> Result<(), ReconfigureError> {
use tor_config::Reconfigure;
let state = self.lock().map_err(into_internal!(
"cannot reconfigure corrupted memquota tracker"
))?;
let (state, new_config) = match (state, new_config.0) {
(Noop, Noop) => return Ok(()),
(Noop, Enabled(..)) => return how.cannot_change(
"tor-memquota max (`system.memory.max`) cannot be set: cannot enable memory quota tracking, when disabled at program start"
),
(Enabled(state, _enabled), new_config) => {
let new_config = new_config.into_enabled().unwrap_or(
ConfigInner {
max: Qty::MAX,
low_water: Qty::MAX,
},
);
(state, new_config)
},
};
let mut state = match how {
Reconfigure::CheckAllOrNothing => return Ok(()),
Reconfigure::AllOrNothing | Reconfigure::WarnOnFailures => state,
_ => Err(internal!("Reconfigure variant unknown! {how:?}"))?, };
let global = &mut state.global;
global.config = new_config;
global.total_used.maybe_wakeup(&global.config);
Ok(())
}
pub fn used_current_approx(&self) -> Result<usize, TrackerCorrupted> {
let Enabled(state, _enabled) = self.lock()? else {
return Ok(usize::MAX);
};
Ok(*state.total_used.as_raw())
}
#[allow(clippy::redundant_closure_call)] pub fn new_account(self: &Arc<Self>, parent: Option<&Account>) -> crate::Result<Account> {
let Enabled(mut state, enabled) = self.lock()? else {
return Ok(Account(Noop));
};
let parent_aid_good = parent
.map(|parent| state.prepare_parent_aid(parent))
.transpose()?;
Ok((|| {
let aid = refcount::slotmap_insert(&mut state.accounts, |refcount| ARecord {
refcount,
children: vec![],
ps: SlotMap::default(),
enabled,
});
if let Some(parent_aid_good) = parent_aid_good {
state
.accounts
.get_mut(parent_aid_good)
.expect("parent vanished!")
.children
.push(*aid);
}
let tracker = self.clone();
let inner = AccountInner { aid, tracker };
Account(Enabled(inner, enabled)) })())
}
pub fn new_noop() -> Arc<MemoryQuotaTracker> {
Arc::new(MemoryQuotaTracker { state: Noop })
}
fn lock(&self) -> Result<IfEnabled<MutexGuard<State>>, TrackerCorrupted> {
let Enabled(state, enabled) = &self.state else {
return Ok(Noop);
};
Ok(Enabled(state.lock()?, *enabled))
}
}
impl Account {
pub fn register_participant(
&self,
particip: Weak<dyn IsParticipant>,
) -> Result<Participation, Error> {
let Enabled(self_, enabled) = &self.0 else {
return Ok(Participation(Noop));
};
let aid = *self_.aid;
find_in_tracker! {
enabled;
self_.tracker => state;
aid => arecord;
?Error
}
let (pid, cache) = refcount::slotmap_try_insert(&mut arecord.ps, |refcount| {
let mut precord = PRecord {
refcount,
used: ParticipQty::ZERO,
particip: drop_reentrancy::ProtectedWeak::new(particip),
enabled: *enabled,
};
let cache =
state
.global
.total_used
.claim(&mut precord, MAX_CACHE, &state.global.config)?;
Ok::<_, Error>((precord, cache))
})?;
let tracker = Arc::downgrade(&self_.tracker);
let inner = ParticipationInner {
tracker,
pid,
aid,
cache,
};
Ok(Participation(Enabled(inner, *enabled)))
}
fn set_participant_callbacks(
&self,
aid: AId,
pid: PId,
particip: drop_reentrancy::ProtectedWeak<dyn IsParticipant>,
) -> Result<(), Error> {
let Enabled(self_, enabled) = &self.0 else {
return Ok(());
};
find_in_tracker! {
enabled;
self_.tracker => state;
aid => arecord;
pid => precord;
?Error
}
precord.particip = particip;
Ok(())
}
pub fn register_participant_with<P: IsParticipant, X, E>(
&self,
now: CoarseInstant,
constructor: impl FnOnce(Participation) -> Result<(Arc<P>, X), E>,
) -> Result<Result<(Arc<P>, X), E>, Error> {
let Enabled(_self, _enabled) = &self.0 else {
return Ok(constructor(Participation(Noop)));
};
use std::sync::atomic::{AtomicBool, Ordering};
#[derive(Debug)]
struct TemporaryParticipant {
now: CoarseInstant,
collapsing: AtomicBool,
}
impl IsParticipant for TemporaryParticipant {
fn get_oldest(&self, _: EnabledToken) -> Option<CoarseInstant> {
Some(self.now)
}
fn reclaim(self: Arc<Self>, _: EnabledToken) -> ReclaimFuture {
self.collapsing.store(true, Ordering::Release);
Box::pin(async { Reclaimed::Collapsing })
}
}
let temp_particip = Arc::new(TemporaryParticipant {
now,
collapsing: false.into(),
});
let partn = self.register_participant(Arc::downgrade(&temp_particip) as _)?;
let partn_ = partn
.0
.as_enabled()
.ok_or_else(|| internal!("Enabled Account gave Noop Participant"))?;
let aid = partn_.aid;
let pid_weak = *partn_.pid;
let (particip, xdata) = match constructor(partn) {
Ok(y) => y,
Err(e) => return Ok(Err(e)),
};
let particip = drop_reentrancy::ProtectedArc::new(particip);
let r = (|| {
let weak = {
let weak = particip.downgrade();
drop_reentrancy::ProtectedWeak::new(weak.unprotect() as _)
};
self.set_participant_callbacks(aid, pid_weak, weak)?;
if temp_particip.collapsing.load(Ordering::Acquire) {
return Err(Error::ParticipantShutdown);
}
Ok(())
})();
let particip = particip.promise_dropping_is_ok();
r?;
Ok(Ok((particip, xdata)))
}
pub fn new_child(&self) -> crate::Result<Self> {
let Enabled(self_, _enabled) = &self.0 else {
return Ok(Account::new_noop());
};
self_.tracker.new_account(Some(self))
}
pub fn add_parent(&self, parent: &Account) -> Result<(), Error> {
let Enabled(acc, _enabled) = &self.0 else {
return Ok(());
};
let Enabled(mut state, _enabled) = acc.tracker.lock()? else {
return Ok(());
};
let parent_aid_good = state.prepare_parent_aid(parent)?;
if state
.get_aid_and_children_recursively(*acc.aid)
.contains(&parent_aid_good)
{
return Err(tor_error::bad_api_usage!(
"tried to create circular parent relationship?!"
)
.into());
}
let children = &mut state
.accounts
.get_mut(parent_aid_good)
.expect("parent vanished!")
.children;
if children.contains(&acc.aid) {
return Err(Error::ChildAccountAlreadyExists);
}
children.push(*acc.aid);
Ok(())
}
pub fn tracker(&self) -> Arc<MemoryQuotaTracker> {
let Enabled(self_, _enabled) = &self.0 else {
return MemoryQuotaTracker::new_noop();
};
self_.tracker.clone()
}
pub fn downgrade(&self) -> WeakAccount {
let Enabled(self_, enabled) = &self.0 else {
return WeakAccount(Noop);
};
let inner = WeakAccountInner {
aid: *self_.aid,
tracker: Arc::downgrade(&self_.tracker),
};
WeakAccount(Enabled(inner, *enabled))
}
pub fn new_noop() -> Self {
Account(IfEnabled::Noop)
}
}
impl Clone for Account {
fn clone(&self) -> Account {
let Enabled(self_, enabled) = &self.0 else {
return Account(Noop);
};
let tracker = self_.tracker.clone();
let aid = (|| {
let aid = *self_.aid;
find_in_tracker! {
enabled;
tracker => state;
aid => arecord;
?None
}
let aid = refcount::Ref::new(aid, &mut arecord.refcount).ok()?;
Some(aid)
})()
.unwrap_or_else(|| {
refcount::Ref::null()
});
let inner = AccountInner { aid, tracker };
Account(Enabled(inner, *enabled))
}
}
impl Drop for Account {
fn drop(&mut self) {
let Enabled(self_, enabled) = &mut self.0 else {
return;
};
(|| {
find_in_tracker! {
enabled;
self_.tracker => state;
*self_.aid => arecord;
?None
}
if let Some(refcount::Garbage(mut removed)) =
slotmap_dec_ref!(&mut state.accounts, self_.aid.take(), &mut arecord.refcount)
{
removed.auto_release(state);
}
Some(())
})()
.unwrap_or_else(|| {
self_.aid.take().dispose_container_destroyed();
});
}
}
impl WeakAccount {
pub fn upgrade(&self) -> crate::Result<Account> {
let Enabled(self_, enabled) = &self.0 else {
return Ok(Account(Noop));
};
let aid = self_.aid;
let tracker = self_.tracker.upgrade().ok_or(Error::TrackerShutdown)?;
let aid = {
find_in_tracker! {
enabled;
tracker => state;
aid => arecord;
?Error
}
refcount::Ref::new(aid, &mut arecord.refcount)?
};
let inner = AccountInner { aid, tracker };
Ok(Account(Enabled(inner, *enabled)))
}
pub fn tracker(&self) -> Weak<MemoryQuotaTracker> {
let Enabled(self_, _enabled) = &self.0 else {
return Weak::default();
};
self_.tracker.clone()
}
pub fn new_dangling() -> Self {
let Some(enabled) = EnabledToken::new_if_compiled_in() else {
return WeakAccount(Noop);
};
let inner = WeakAccountInner {
aid: AId::default(),
tracker: Weak::default(),
};
WeakAccount(Enabled(inner, enabled))
}
}
impl Participation {
pub fn claim(&mut self, want: usize) -> crate::Result<()> {
self.claim_qty(Qty(want))
}
pub(crate) fn claim_qty(&mut self, want: Qty) -> crate::Result<()> {
self.claim_qty_inner(want)
.inspect_err(|e| trace_report!(e, "claim {}", want))
}
fn claim_qty_inner(&mut self, want: Qty) -> crate::Result<()> {
let Enabled(self_, enabled) = &mut self.0 else {
return Ok(());
};
#[cfg(debug_assertions)]
{
find_in_tracker! {
enabled;
self_.tracker => + tracker, state;
self_.aid => _arecord;
*self_.pid => _precord;
?Error
};
}
if let Some(got) = self_.cache.split_off(want) {
return got.claim_return_to_participant();
}
find_in_tracker! {
enabled;
self_.tracker => + tracker, state;
self_.aid => arecord;
*self_.pid => precord;
?Error
};
let mut claim = |want| -> Result<ClaimedQty, _> {
state
.global
.total_used
.claim(precord, want, &state.global.config)
};
let got = claim(want)?;
if want <= TARGET_CACHE_CLAIMING {
let want_more_cache = TARGET_CACHE_CLAIMING
.checked_sub(*self_.cache.as_raw())
.expect("but cache < want");
let want_more_cache = Qty(want_more_cache);
if let Ok(add_cache) = claim(want_more_cache) {
self_.cache.merge_into(add_cache);
}
}
got.claim_return_to_participant()
}
pub fn release(&mut self, have: usize) {
self.release_qty(Qty(have));
}
pub(crate) fn release_qty(&mut self, have: Qty) {
let Enabled(self_, enabled) = &mut self.0 else {
return;
};
let have = ClaimedQty::release_got_from_participant(have);
self_.cache.merge_into(have);
if self_.cache > MAX_CACHE {
match (|| {
find_in_tracker! {
enabled;
self_.tracker => + tracker, state;
self_.aid => arecord;
*self_.pid => precord;
?None
}
let return_from_cache = self_
.cache
.as_raw()
.checked_sub(*TARGET_CACHE_RELEASING)
.expect("TARGET_CACHE_RELEASING > MAX_CACHE ?!");
let return_from_cache = Qty(return_from_cache);
let from_cache = self_
.cache
.split_off(return_from_cache)
.expect("impossible");
state.global.total_used.release(precord, from_cache);
Some(())
})() {
Some(()) => {} None => {
self_.cache.take().dispose_participant_destroyed();
}
}
}
}
pub fn account(&self) -> WeakAccount {
let Enabled(self_, enabled) = &self.0 else {
return WeakAccount(Noop);
};
let inner = WeakAccountInner {
aid: self_.aid,
tracker: self_.tracker.clone(),
};
WeakAccount(Enabled(inner, *enabled))
}
pub fn destroy_participant(mut self) {
let Enabled(self_, enabled) = &mut self.0 else {
return;
};
(|| {
find_in_tracker! {
enabled;
self_.tracker => + tracker, state;
self_.aid => arecord;
?None
};
if let Some(mut removed) =
refcount::slotmap_remove_early(&mut arecord.ps, self_.pid.take())
{
removed.auto_release(&mut state.global);
}
Some(())
})();
}
pub fn new_dangling() -> Self {
let Some(enabled) = EnabledToken::new_if_compiled_in() else {
return Participation(Noop);
};
let inner = ParticipationInner {
pid: refcount::Ref::default(),
aid: AId::default(),
tracker: Weak::default(),
cache: ClaimedQty::ZERO,
};
Participation(Enabled(inner, enabled))
}
}
impl Clone for Participation {
fn clone(&self) -> Participation {
let Enabled(self_, enabled) = &self.0 else {
return Participation(Noop);
};
let aid = self_.aid;
let cache = ClaimedQty::ZERO;
let tracker: Weak<_> = self_.tracker.clone();
let pid = (|| {
let pid = *self_.pid;
find_in_tracker! {
enabled;
self_.tracker => + tracker_strong, state;
aid => _arecord;
pid => precord;
?None
}
let pid = refcount::Ref::new(pid, &mut precord.refcount).ok()?;
Some(pid)
})()
.unwrap_or_else(|| {
refcount::Ref::null()
});
let inner = ParticipationInner {
aid,
pid,
cache,
tracker,
};
Participation(Enabled(inner, *enabled))
}
}
impl Drop for Participation {
fn drop(&mut self) {
let Enabled(self_, enabled) = &mut self.0 else {
return;
};
(|| {
find_in_tracker! {
enabled;
self_.tracker => + tracker_strong, state;
self_.aid => arecord;
*self_.pid => precord;
?None
}
let from_cache = self_.cache.take();
state.global.total_used.release(precord, from_cache);
if let Some(refcount::Garbage(mut removed)) =
slotmap_dec_ref!(&mut arecord.ps, self_.pid.take(), &mut precord.refcount)
{
removed.auto_release(&mut state.global);
}
Some(())
})()
.unwrap_or_else(|| {
self_.pid.take().dispose_container_destroyed();
self_.cache.take().dispose_participant_destroyed();
});
}
}
impl State {
fn get_aid_and_children_recursively(&self, parent_aid: AId) -> HashSet<AId> {
let mut out = HashSet::<AId>::new();
let mut queue: Vec<AId> = vec![parent_aid];
while let Some(aid) = queue.pop() {
let Some(arecord) = self.accounts.get(aid) else {
continue;
};
if out.insert(aid) {
queue.extend(arecord.children.iter().cloned());
}
}
out
}
#[allow(clippy::redundant_closure_call)] fn prepare_parent_aid(&mut self, parent: &Account) -> crate::Result<AId> {
let Enabled(parent, _enabled) = &parent.0 else {
return Err(internal!("used no-op Account as parent for enabled account").into());
};
let parent_aid = *parent.aid;
let parent_arecord = self
.accounts
.get_mut(parent_aid)
.ok_or(Error::AccountClosed)?;
if !parent_arecord.children.spare_capacity_mut().is_empty() {
return Ok(parent_aid);
}
let mut parent_children = mem::take(&mut parent_arecord.children);
(|| {
parent_children.retain(|child_aid| self.accounts.contains_key(*child_aid));
self.accounts
.get_mut(parent_aid)
.expect("parent vanished!")
.children = parent_children;
})();
Ok::<_, Error>(parent_aid)
}
}
impl ARecord {
fn auto_release(&mut self, global: &mut Global) {
for (_pid, mut precord) in self.ps.drain() {
precord.auto_release(global);
}
}
}
impl PRecord {
fn auto_release(&mut self, global: &mut Global) {
let for_teardown = self.used.for_participant_teardown();
global.total_used.release(self, for_teardown);
}
}