use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::Ordering;
use std::sync::Mutex;
use minibytes::Bytes;
use minibytes::WeakBytes;
pub(crate) static THRESHOLD: AtomicI64 = AtomicI64::new(DEFAULT_THRESHOLD);
pub(crate) static NEED_FIND_REGION: AtomicBool = AtomicBool::new(false);
static AVAILABLE: AtomicI64 = AtomicI64::new(DEFAULT_THRESHOLD);
static BUFFERS: Mutex<Vec<WeakBytes>> = Mutex::new(Vec::new());
const DEFAULT_THRESHOLD: i64 = 1i64 << 31;
pub(crate) fn adjust_available(delta: i64) {
let old_available = AVAILABLE.fetch_add(delta as _, Ordering::AcqRel);
if old_available + delta < 0 {
let threshold = THRESHOLD.load(Ordering::Acquire);
if threshold > 0 {
let mut buffers = BUFFERS.lock().unwrap();
AVAILABLE.store(threshold, Ordering::Release);
tracing::info!("running page_out()");
page_out(&mut buffers);
}
}
}
pub(crate) fn track_mmap_buffer(bytes: &Bytes) {
let threshold = THRESHOLD.load(Ordering::Acquire);
if threshold > 0 || NEED_FIND_REGION.load(Ordering::Acquire) {
let mut buffers = BUFFERS.lock().unwrap();
if let Some(weak) = bytes.downgrade() {
buffers.push(weak);
}
}
}
#[cfg(unix)]
fn page_out(buffers: &mut Vec<WeakBytes>) {
let mut new_buffers = Vec::new();
for weak in buffers.drain(..) {
let bytes = match Bytes::upgrade(&weak) {
None => continue,
Some(bytes) => bytes,
};
let slice: &[u8] = bytes.as_ref();
#[cfg(unix)]
let ret = unsafe {
libc::madvise(
slice.as_ptr() as *const libc::c_void as *mut libc::c_void,
slice.len() as _,
libc::MADV_DONTNEED,
)
};
tracing::debug!(
"madvise({} bytes, MADV_DONTNEED) returned {}",
slice.len(),
ret
);
new_buffers.push(weak);
}
*buffers = new_buffers;
}
#[cfg(unix)]
pub(crate) fn find_region(addr: usize) -> Option<(usize, usize)> {
let locked = BUFFERS.try_lock().ok()?;
for weak in locked.iter() {
let bytes = match Bytes::upgrade(weak) {
None => continue,
Some(bytes) => bytes,
};
let start = bytes.as_ptr() as usize;
let len = bytes.len();
if start <= addr && start.wrapping_add(len) > addr {
return Some((start, len));
}
}
None
}
#[cfg(windows)]
fn page_out(buffers: &mut Vec<WeakBytes>) {
use winapi::um::processthreadsapi::GetCurrentProcess;
use winapi::um::psapi::EmptyWorkingSet;
unsafe {
let handle = GetCurrentProcess();
let ret = EmptyWorkingSet(handle);
tracing::debug!("EmptyWorkingSet returned {}", ret);
}
buffers.clear();
}