1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
//! Implement half of log rate-limiting: the ability to cause the state of a
//! Loggable to get flushed at appropriate intervals.
use super::{Activity, Loggable};
use futures::task::SpawnExt as _;
use std::{
sync::{Arc, Mutex},
time::Duration,
};
use tor_error::ErrorReport;
/// Declare a dyn-safe trait for the parts of an asynchronous runtime so that we
/// can install it globally.
pub(crate) mod rt {
use futures::{future::BoxFuture, task::Spawn};
use once_cell::sync::OnceCell;
use std::time::{Duration, Instant};
/// A dyn-safe view of the parts of an async runtime that we need for rate-limiting.
pub trait RuntimeSupport: Spawn + 'static + Sync + Send {
/// Return a future that will yield () after `duration` has passed.
fn sleep(&self, duration: Duration) -> BoxFuture<'_, ()>;
/// Return the current time as an Instant.
fn now(&self) -> Instant;
}
impl<R: tor_rtcompat::Runtime> RuntimeSupport for R {
fn sleep(&self, duration: Duration) -> BoxFuture<'_, ()> {
Box::pin(tor_rtcompat::SleepProvider::sleep(self, duration))
}
fn now(&self) -> Instant {
tor_rtcompat::SleepProvider::now(self)
}
}
/// A global view of our runtime, used for rate-limited logging.
// TODO MSRV 1.70: We could use OnceSync instead.
static RUNTIME_SUPPORT: OnceCell<Box<dyn RuntimeSupport>> = OnceCell::new();
/// Try to install `runtime` as a global runtime to be used for rate-limited logging.
///
/// Return an error (and make no changes) if there there was already a runtime installed.
pub fn install_runtime<R: tor_rtcompat::Runtime>(
runtime: R,
) -> Result<(), InstallRuntimeError> {
let rt = Box::new(runtime);
RUNTIME_SUPPORT
.set(rt)
.map_err(|_| InstallRuntimeError::DuplicateCall)
}
/// An error that occurs while installing a runtime.
#[derive(Clone, Debug, thiserror::Error)]
#[non_exhaustive]
pub enum InstallRuntimeError {
/// Tried to install a runtime when there was already one installed.
#[error("Called tor_log_ratelim::install_runtime() more than once")]
DuplicateCall,
}
/// Return the installed runtime, if there is one.
pub fn rt_support() -> Option<&'static dyn RuntimeSupport> {
RUNTIME_SUPPORT.get().map(Box::as_ref)
}
}
/// A rate-limited wrapper around a [`Loggable`]` that ensures its events are
/// flushed from time to time.
pub struct RateLim<T> {
/// The Loggable itself.
inner: Mutex<Inner<T>>,
}
/// The mutable state of a [`RateLim`].
struct Inner<T> {
/// The loggable state whose reports are rate-limited
loggable: T,
/// True if we have a running task that is collating reports for `loggable`.
task_running: bool,
}
impl<T: Loggable> RateLim<T> {
/// Create a new `RateLim` to flush events for `loggable`.
pub fn new(loggable: T) -> Arc<Self> {
Arc::new(RateLim {
inner: Mutex::new(Inner {
loggable,
task_running: false,
}),
})
}
/// Adjust the status of this reporter's `Loggable` by calling `f` on it,
/// but only if it is already scheduled to report itself. Otherwise, do nothing.
///
/// This is the appropriate function to use for tracking successes.f
pub fn nonevent<F>(&self, f: F)
where
F: FnOnce(&mut T),
{
let mut inner = self.inner.lock().expect("lock poisoned");
if inner.task_running {
f(&mut inner.loggable);
}
}
/// Add an event to this rate-limited reporter by calling `f` on it, and
/// schedule it to be reported after an appropriate time.
///
/// NOTE: This API is a big ugly. If we ever decide to make it non-hidden,
/// we may want to make it call rt_support() directly again, as it did in
/// earlier visions.
pub fn event<F>(self: &Arc<Self>, rt: &'static dyn rt::RuntimeSupport, f: F)
where
F: FnOnce(&mut T),
{
let mut inner = self.inner.lock().expect("poisoned lock");
f(&mut inner.loggable);
if !inner.task_running {
// Launch a task to make periodic reports on the state of our Loggable.
inner.task_running = true;
if let Err(e) = rt.spawn(Box::pin(run(rt, Arc::clone(self)))) {
// We couldn't spawn a task; we have to flush the state
// immediately.
//
// TODO: This behavior is undesirable if it causes us to spam
// the log while we are shutting down. On the other hand, it's
// also undesirable if we suppress our logs while we're
// shutting down.
inner.loggable.flush(Duration::default());
tracing::warn!("Also, unable to spawn a logging task: {}", e.report());
}
}
}
}
/// After approximately this many seconds of not having anything to report, we
/// should reset our timeout schedule.
const RESET_AFTER_DORMANT_FOR: Duration = Duration::new(4 * 60 * 60, 0);
/// Return an iterator of reasonable amounts of time to summarize.
///
/// We summarize short intervals at first, and back off as the event keeps
/// happening.
fn timeout_sequence() -> impl Iterator<Item = Duration> {
/// seconds per second.
const SEC: u64 = 1;
/// seconds per minute.
const MIN: u64 = 60;
/// seconds per hour
const HOUR: u64 = 3600;
[
5 * SEC,
MIN,
5 * MIN,
30 * MIN,
30 * MIN,
HOUR,
HOUR,
4 * HOUR,
4 * HOUR,
]
.into_iter()
.chain(std::iter::repeat(24 * HOUR))
.map(|secs| Duration::new(secs, 0))
}
/// Helper: runs in a background task, and periodically flushes the `Loggable`
/// in `ratelim`. Exits after [`Loggable::flush`] returns [`Activity::Dormant`]
/// for "long enough".
async fn run<T>(rt_support: &dyn rt::RuntimeSupport, ratelim: Arc<RateLim<T>>)
// TODO : Perhaps instead of taking an Arc<RateLim<T>> we want sometimes to take
// a `&'static RateLim<T>``, so we don't need to mess about with `Arc`s needlessly.
where
T: Loggable,
{
let mut dormant_since = None;
for duration in timeout_sequence() {
rt_support.sleep(duration).await;
{
let mut inner = ratelim.inner.lock().expect("Lock poisoned");
debug_assert!(inner.task_running);
// NOTE: We say that we are summarizing "duration" on the theory
// that we actually slept for "duration". But maybe `sleep` slept
// for a little more or less? Nonetheless, we report this as if we
// had slept for the exact amount, since the alternative appears to
// be saying stuff like "this problem occurred 8/12 times in the
// last 10min 0.0014ssec" instead of "10m".
if inner.loggable.flush(duration) == Activity::Dormant {
// TODO: This can tell the user several times that the problem
// did not occur! Perhaps we only want to flush once on dormant,
// and then not report the dormant condition again until we are
// no longer tracking it. Or perhaps we should lower the
// responsibility for deciding when to log and when to uninstall
// to the Loggable?
match dormant_since {
Some(when) => {
if let Some(dormant_for) = rt_support.now().checked_duration_since(when) {
if dormant_for >= RESET_AFTER_DORMANT_FOR {
inner.task_running = false;
return;
}
}
}
None => {
dormant_since = Some(rt_support.now());
}
}
} else {
dormant_since = None;
}
}
}
unreachable!("timeout_sequence returned a finite sequence");
}
// TODO : Write some tests.