use crate::core::memguard::{current_rss_mb, current_rss_mb_for_pid};
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering as AtomicOrdering};
use std::sync::Arc;
use std::time::Duration;
pub(super) fn spawn_memory_poller(
mem_limit: Option<u64>,
mem_abort: Arc<AtomicBool>,
peak_rss: Arc<AtomicU64>,
index_id: String,
) -> (tokio::task::JoinHandle<()>, Arc<AtomicBool>) {
const MEM_POLL_INTERVAL: Duration = Duration::from_secs(1);
let stop = Arc::new(AtomicBool::new(false));
let stop_clone = stop.clone();
let handle = tokio::spawn(async move {
let mut ticker = tokio::time::interval(MEM_POLL_INTERVAL);
ticker.tick().await;
loop {
if stop_clone.load(AtomicOrdering::Acquire) {
break;
}
if let Some(rss) = current_rss_mb() {
let mut prev = peak_rss.load(AtomicOrdering::Acquire);
while rss > prev {
match peak_rss.compare_exchange_weak(
prev,
rss,
AtomicOrdering::AcqRel,
AtomicOrdering::Acquire,
) {
Ok(_) => break,
Err(cur) => prev = cur,
}
}
if let Some(limit) = mem_limit {
if rss >= limit && !mem_abort.load(AtomicOrdering::Acquire) {
tracing::warn!(
"reindex memory poller: rss={}MB >= limit={}MB \
— tripping abort flag for index {}",
rss,
limit,
index_id,
);
mem_abort.store(true, AtomicOrdering::Release);
}
}
}
ticker.tick().await;
}
});
(handle, stop)
}
pub(super) fn spawn_embedderd_rss_poller(
embedderd_pid_slot: Arc<AtomicU32>,
peak_embedderd_rss: Arc<AtomicU64>,
) -> (tokio::task::JoinHandle<()>, Arc<AtomicBool>) {
const EMBEDDERD_POLL_INTERVAL: Duration = Duration::from_millis(500);
let stop = Arc::new(AtomicBool::new(false));
let stop_clone = stop.clone();
let handle = tokio::spawn(async move {
let mut ticker = tokio::time::interval(EMBEDDERD_POLL_INTERVAL);
ticker.tick().await;
loop {
if stop_clone.load(AtomicOrdering::Acquire) {
break;
}
let pid = embedderd_pid_slot.load(AtomicOrdering::Acquire);
if let Some(rss) = current_rss_mb_for_pid(pid) {
let mut prev = peak_embedderd_rss.load(AtomicOrdering::Acquire);
while rss > prev {
match peak_embedderd_rss.compare_exchange_weak(
prev,
rss,
AtomicOrdering::AcqRel,
AtomicOrdering::Acquire,
) {
Ok(_) => break,
Err(cur) => prev = cur,
}
}
}
ticker.tick().await;
}
});
(handle, stop)
}