use std::time::Duration;
use log::{debug, warn};
use tokio::{io, time::sleep};
use crate::{
app::{MemoryBudgets, builder_defaults::default_memory_budgets},
message_assembler::MessageAssemblyState,
};
const SOFT_LIMIT_NUMERATOR: u128 = 4;
const SOFT_LIMIT_DENOMINATOR: u128 = 5;
const SOFT_LIMIT_PAUSE_DURATION: Duration = Duration::from_millis(5);
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) enum MemoryPressureAction {
Continue,
Pause(Duration),
Abort,
}
#[must_use]
pub(crate) fn evaluate_memory_pressure(
state: Option<&MessageAssemblyState>,
budgets: Option<MemoryBudgets>,
) -> MemoryPressureAction {
if has_hard_cap_been_breached(state, budgets) {
return MemoryPressureAction::Abort;
}
if should_pause_inbound_reads(state, budgets) {
return MemoryPressureAction::Pause(SOFT_LIMIT_PAUSE_DURATION);
}
MemoryPressureAction::Continue
}
pub(crate) async fn apply_memory_pressure(
action: MemoryPressureAction,
mut purge: impl FnMut(),
) -> io::Result<()> {
match action {
MemoryPressureAction::Abort => {
warn!("memory budget hard cap exceeded; aborting connection");
Err(io::Error::new(
io::ErrorKind::InvalidData,
"per-connection memory budget hard cap exceeded",
))
}
MemoryPressureAction::Pause(duration) => {
debug!("soft memory budget pressure; pausing inbound reads");
sleep(duration).await;
purge();
Ok(())
}
MemoryPressureAction::Continue => Ok(()),
}
}
#[must_use]
pub(super) fn has_hard_cap_been_breached(
state: Option<&MessageAssemblyState>,
budgets: Option<MemoryBudgets>,
) -> bool {
let (Some(state), Some(budgets)) = (state, budgets) else {
return false;
};
let buffered_bytes = state.total_buffered_bytes();
let aggregate_limit = active_aggregate_limit_bytes(budgets);
buffered_bytes > aggregate_limit
}
#[must_use]
pub(super) fn should_pause_inbound_reads(
state: Option<&MessageAssemblyState>,
budgets: Option<MemoryBudgets>,
) -> bool {
let (Some(state), Some(budgets)) = (state, budgets) else {
return false;
};
let buffered_bytes = state.total_buffered_bytes();
let aggregate_limit = active_aggregate_limit_bytes(budgets);
is_at_or_above_soft_limit(buffered_bytes, aggregate_limit)
}
fn active_aggregate_limit_bytes(budgets: MemoryBudgets) -> usize {
budgets
.bytes_per_connection()
.as_usize()
.min(budgets.bytes_in_flight().as_usize())
}
#[must_use]
pub(crate) fn resolve_effective_budgets(
explicit: Option<MemoryBudgets>,
frame_budget: usize,
) -> MemoryBudgets {
explicit.unwrap_or_else(|| default_memory_budgets(frame_budget))
}
fn is_at_or_above_soft_limit(buffered_bytes: usize, aggregate_limit: usize) -> bool {
let lhs = (buffered_bytes as u128).saturating_mul(SOFT_LIMIT_DENOMINATOR);
let rhs = (aggregate_limit as u128).saturating_mul(SOFT_LIMIT_NUMERATOR);
lhs >= rhs
}