use super::{Activity, Loggable};
use std::{
sync::{Arc, Mutex},
time::Duration,
};
use tor_error::ErrorReport;
use tor_rtcompat::SpawnExt as _;
pub(crate) mod rt {
use futures::{future::BoxFuture, task::Spawn};
use std::sync::OnceLock;
use std::time::{Duration, Instant};
pub trait RuntimeSupport: Spawn + 'static + Sync + Send {
fn sleep(&self, duration: Duration) -> BoxFuture<'_, ()>;
fn now(&self) -> Instant;
}
impl<R: tor_rtcompat::Runtime> RuntimeSupport for R {
fn sleep(&self, duration: Duration) -> BoxFuture<'_, ()> {
Box::pin(tor_rtcompat::SleepProvider::sleep(self, duration))
}
fn now(&self) -> Instant {
tor_rtcompat::SleepProvider::now(self)
}
}
static RUNTIME_SUPPORT: OnceLock<Box<dyn RuntimeSupport>> = OnceLock::new();
pub fn install_runtime<R: tor_rtcompat::Runtime>(
runtime: R,
) -> Result<(), InstallRuntimeError> {
let rt = Box::new(runtime);
RUNTIME_SUPPORT
.set(rt)
.map_err(|_| InstallRuntimeError::DuplicateCall)
}
#[derive(Clone, Debug, thiserror::Error)]
#[non_exhaustive]
pub enum InstallRuntimeError {
#[error("Called tor_log_ratelim::install_runtime() more than once")]
DuplicateCall,
}
pub fn rt_support() -> Option<&'static dyn RuntimeSupport> {
RUNTIME_SUPPORT.get().map(Box::as_ref)
}
}
pub struct RateLim<T> {
inner: Mutex<Inner<T>>,
}
struct Inner<T> {
loggable: T,
task_running: bool,
}
impl<T: Loggable> RateLim<T> {
pub fn new(loggable: T) -> Arc<Self> {
Arc::new(RateLim {
inner: Mutex::new(Inner {
loggable,
task_running: false,
}),
})
}
pub fn nonevent<F>(&self, f: F)
where
F: FnOnce(&mut T),
{
let mut inner = self.inner.lock().expect("lock poisoned");
if inner.task_running {
f(&mut inner.loggable);
}
}
pub fn event<F>(self: &Arc<Self>, rt: &'static dyn rt::RuntimeSupport, f: F)
where
F: FnOnce(&mut T),
{
let mut inner = self.inner.lock().expect("poisoned lock");
f(&mut inner.loggable);
if !inner.task_running {
inner.task_running = true;
if let Err(e) = rt.spawn(Box::pin(run(rt, Arc::clone(self)))) {
inner.loggable.flush(Duration::default());
tracing::warn!("Also, unable to spawn a logging task: {}", e.report());
}
}
}
}
const RESET_AFTER_DORMANT_FOR: Duration = Duration::new(4 * 60 * 60, 0);
fn timeout_sequence() -> impl Iterator<Item = Duration> {
const SEC: u64 = 1;
const MIN: u64 = 60;
const HOUR: u64 = 3600;
[
5 * SEC,
MIN,
5 * MIN,
30 * MIN,
30 * MIN,
HOUR,
HOUR,
4 * HOUR,
4 * HOUR,
]
.into_iter()
.chain(std::iter::repeat(24 * HOUR))
.map(|secs| Duration::new(secs, 0))
}
async fn run<T>(rt_support: &dyn rt::RuntimeSupport, ratelim: Arc<RateLim<T>>)
where
T: Loggable,
{
let mut dormant_since = None;
for duration in timeout_sequence() {
rt_support.sleep(duration).await;
{
let mut inner = ratelim.inner.lock().expect("Lock poisoned");
debug_assert!(inner.task_running);
if inner.loggable.flush(duration) == Activity::Dormant {
match dormant_since {
Some(when) => {
if let Some(dormant_for) = rt_support.now().checked_duration_since(when) {
if dormant_for >= RESET_AFTER_DORMANT_FOR {
inner.task_running = false;
return;
}
}
}
None => {
dormant_since = Some(rt_support.now());
}
}
} else {
dormant_since = None;
}
}
}
unreachable!("timeout_sequence returned a finite sequence");
}