use std::{sync::atomic::Ordering, time::Instant};
use humansize::make_format;
use metrics::atomics::AtomicU64;
use tracing::{debug, trace};
use crate::memory::{BufferKind, PagedPool};
pub const MINIMUM_MEM_LIMIT: u64 = 512 * 1024 * 1024;
#[derive(Debug)]
pub enum BufferArea {
Upload,
Prefetch,
}
impl BufferArea {
pub fn as_str(&self) -> &'static str {
match self {
BufferArea::Upload => "upload",
BufferArea::Prefetch => "prefetch",
}
}
}
#[derive(Debug)]
pub struct MemoryLimiter {
mem_limit: u64,
mem_reserved: AtomicU64,
additional_mem_reserved: u64,
pool: PagedPool,
}
impl MemoryLimiter {
pub fn new(pool: PagedPool, mem_limit: u64) -> Self {
let min_reserved = 128 * 1024 * 1024;
let reserved_mem = (mem_limit / 8).max(min_reserved);
let formatter = make_format(humansize::BINARY);
debug!(
"target memory usage is {} with {} reserved memory",
formatter(mem_limit),
formatter(reserved_mem)
);
Self {
pool,
mem_limit,
mem_reserved: AtomicU64::new(0),
additional_mem_reserved: reserved_mem,
}
}
pub fn reserve(&self, area: BufferArea, size: u64) {
self.mem_reserved.fetch_add(size, Ordering::SeqCst);
metrics::gauge!("mem.bytes_reserved", "area" => area.as_str()).increment(size as f64);
}
pub fn try_reserve(&self, area: BufferArea, size: u64) -> bool {
let start = Instant::now();
let mut mem_reserved = self.mem_reserved.load(Ordering::SeqCst);
loop {
let new_mem_reserved = mem_reserved.saturating_add(size);
let pool_mem_reserved = self.pool_mem_reserved();
let new_total_mem_usage = new_mem_reserved
.saturating_add(pool_mem_reserved)
.saturating_add(self.additional_mem_reserved);
if new_total_mem_usage > self.mem_limit {
trace!(new_total_mem_usage, "not enough memory to reserve");
metrics::histogram!("mem.reserve_latency_us", "area" => area.as_str())
.record(start.elapsed().as_micros() as f64);
return false;
}
match self.mem_reserved.compare_exchange_weak(
mem_reserved,
new_mem_reserved,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => {
metrics::gauge!("mem.bytes_reserved", "area" => area.as_str()).increment(size as f64);
metrics::histogram!("mem.reserve_latency_us", "area" => area.as_str())
.record(start.elapsed().as_micros() as f64);
return true;
}
Err(current) => mem_reserved = current, }
}
}
pub fn release(&self, area: BufferArea, size: u64) {
self.mem_reserved.fetch_sub(size, Ordering::SeqCst);
metrics::gauge!("mem.bytes_reserved", "area" => area.as_str()).decrement(size as f64);
}
pub fn available_mem(&self) -> u64 {
let mem_reserved = self.mem_reserved.load(Ordering::SeqCst);
let pool_mem_reserved = self.pool_mem_reserved();
self.mem_limit
.saturating_sub(mem_reserved)
.saturating_sub(pool_mem_reserved)
.saturating_sub(self.additional_mem_reserved)
}
fn pool_mem_reserved(&self) -> u64 {
(self.pool.reserved_bytes(BufferKind::PutObject) + self.pool.reserved_bytes(BufferKind::Other)) as u64
}
}