use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicI8, AtomicU64, AtomicUsize, Ordering};
use std::sync::OnceLock;
static TOTAL_HTML_BYTES_IN_MEMORY: AtomicUsize = AtomicUsize::new(0);
static PAGES_ON_DISK: AtomicUsize = AtomicUsize::new(0);
static SPOOL_FILE_COUNTER: AtomicU64 = AtomicU64::new(0);
static CACHED_MEM_STATE: AtomicI8 = AtomicI8::new(0);
static CLEANUP_TX: OnceLock<tokio::sync::mpsc::UnboundedSender<PathBuf>> = OnceLock::new();
fn cleanup_sender() -> &'static tokio::sync::mpsc::UnboundedSender<PathBuf> {
CLEANUP_TX.get_or_init(|| {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<PathBuf>();
if let Ok(handle) = tokio::runtime::Handle::try_current() {
let mut rx = rx;
handle.spawn(async move {
while let Some(path) = rx.recv().await {
let _ = crate::utils::uring_fs::remove_file(path.display().to_string()).await;
}
});
} else {
let mut rx = rx;
std::thread::Builder::new()
.name("spider-spool-cleanup".into())
.spawn(move || {
while let Some(path) = rx.blocking_recv() {
let _ = std::fs::remove_file(&path);
}
})
.expect("failed to spawn spool cleanup thread");
}
tx
})
}
#[inline]
pub fn queue_spool_delete(path: PathBuf) {
let _ = cleanup_sender().send(path);
}
#[cfg(test)]
pub fn flush_cleanup() {
let marker = spool_dir().join(format!(
".flush_{}",
SPOOL_FILE_COUNTER.fetch_add(1, Ordering::Relaxed)
));
let _ = std::fs::write(&marker, b"");
let _ = cleanup_sender().send(marker.clone());
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
while marker.exists() && std::time::Instant::now() < deadline {
std::thread::yield_now();
}
}
fn spool_min_size() -> usize {
static VAL: OnceLock<usize> = OnceLock::new();
*VAL.get_or_init(|| {
std::env::var("SPIDER_HTML_SPOOL_MIN_SIZE")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(64 * 1024) })
}
static SPOOL_DIR: OnceLock<SpoolDirHandle> = OnceLock::new();
struct SpoolDirHandle {
_dir: tempfile::TempDir,
path: PathBuf,
}
fn base_memory_budget() -> usize {
static VAL: OnceLock<usize> = OnceLock::new();
*VAL.get_or_init(|| {
std::env::var("SPIDER_HTML_MEMORY_BUDGET")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(2 * 1024 * 1024 * 1024) })
}
fn base_per_page_threshold() -> usize {
static VAL: OnceLock<usize> = OnceLock::new();
*VAL.get_or_init(|| {
std::env::var("SPIDER_HTML_PAGE_SPOOL_SIZE")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(80 * 1024 * 1024) })
}
#[inline]
pub fn track_bytes_add(n: usize) {
TOTAL_HTML_BYTES_IN_MEMORY.fetch_add(n, Ordering::Relaxed);
}
#[inline]
pub fn track_bytes_sub(n: usize) {
let _ = TOTAL_HTML_BYTES_IN_MEMORY.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |cur| {
Some(cur.saturating_sub(n))
});
}
#[inline]
pub fn total_bytes_in_memory() -> usize {
TOTAL_HTML_BYTES_IN_MEMORY.load(Ordering::Relaxed)
}
#[inline]
pub fn track_page_spooled() {
PAGES_ON_DISK.fetch_add(1, Ordering::Relaxed);
}
#[inline]
pub fn track_page_unspooled() {
let _ = PAGES_ON_DISK.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |cur| {
Some(cur.saturating_sub(1))
});
}
#[inline]
pub fn pages_on_disk() -> usize {
PAGES_ON_DISK.load(Ordering::Relaxed)
}
#[inline]
pub fn refresh_cached_mem_state() {
CACHED_MEM_STATE.store(
crate::utils::detect_system::get_process_memory_state_sync(),
Ordering::Relaxed,
);
}
#[inline]
pub fn should_spool(html_len: usize) -> bool {
if html_len <= spool_min_size() {
return false;
}
let threshold = base_per_page_threshold();
let mem_state = CACHED_MEM_STATE.load(Ordering::Relaxed);
match mem_state {
s if s >= 2 => return true,
s if s >= 1 => {
if html_len > threshold / 4 {
return true;
}
let current = total_bytes_in_memory();
if current.saturating_add(html_len) > base_memory_budget() {
return true;
}
}
_ => {
if html_len > threshold {
return true;
}
}
}
false
}
pub fn spool_dir() -> &'static Path {
&SPOOL_DIR
.get_or_init(|| {
if let Ok(custom) = std::env::var("SPIDER_HTML_SPOOL_DIR") {
let dir = PathBuf::from(&custom);
let _ = std::fs::create_dir_all(&dir);
match tempfile::Builder::new()
.prefix("spider_html_")
.tempdir_in(&dir)
{
Ok(td) => {
let path = td.path().to_path_buf();
return SpoolDirHandle { _dir: td, path };
}
Err(_) => {
return SpoolDirHandle {
_dir: tempfile::Builder::new()
.prefix("spider_html_fallback_")
.tempdir()
.expect("failed to create temp dir"),
path: dir,
};
}
}
}
let td = tempfile::Builder::new()
.prefix("spider_html_")
.tempdir()
.expect("failed to create temp dir for HTML spool");
let path = td.path().to_path_buf();
SpoolDirHandle { _dir: td, path }
})
.path
}
pub fn next_spool_path() -> PathBuf {
let id = SPOOL_FILE_COUNTER.fetch_add(1, Ordering::Relaxed);
spool_dir().join(format!("{id}.sphtml"))
}
pub fn spool_write(path: &Path, data: &[u8]) -> std::io::Result<()> {
std::fs::write(path, data)
}
pub fn spool_read(path: &Path) -> std::io::Result<Vec<u8>> {
std::fs::read(path)
}
pub fn spool_read_bytes(path: &Path) -> std::io::Result<bytes::Bytes> {
std::fs::read(path).map(bytes::Bytes::from)
}
pub fn spool_delete(path: &Path) {
let _ = std::fs::remove_file(path);
}
pub async fn spool_read_bytes_async(path: std::path::PathBuf) -> std::io::Result<bytes::Bytes> {
crate::utils::uring_fs::read_file(path.display().to_string())
.await
.map(bytes::Bytes::from)
}
pub async fn spool_read_async(path: std::path::PathBuf) -> std::io::Result<Vec<u8>> {
crate::utils::uring_fs::read_file(path.display().to_string()).await
}
pub async fn spool_write_async(path: &Path, data: &[u8]) -> std::io::Result<()> {
crate::utils::uring_fs::write_file(path.display().to_string(), data.to_vec()).await
}
pub async fn spool_stream_chunks_async(
path: std::path::PathBuf,
chunk_size: usize,
cb: impl FnMut(&[u8]) -> bool,
) -> std::io::Result<usize> {
crate::utils::uring_fs::read_file_chunked(path.display().to_string(), chunk_size, cb).await
}
pub fn cleanup_spool_dir() {
if let Some(handle) = SPOOL_DIR.get() {
let _ = std::fs::remove_dir_all(&handle.path);
}
}
pub fn spool_stream_chunks<F>(path: &Path, chunk_size: usize, mut cb: F) -> std::io::Result<usize>
where
F: FnMut(&[u8]) -> bool,
{
use std::io::Read;
let mut file = std::fs::File::open(path)?;
let mut buf = vec![0u8; chunk_size];
let mut total = 0usize;
loop {
let n = file.read(&mut buf)?;
if n == 0 {
break;
}
total = total.saturating_add(n);
if !cb(&buf[..n]) {
break;
}
}
Ok(total)
}
#[cfg(test)]
pub(crate) mod tests {
use super::*;
#[test]
fn test_byte_accounting_saturating() {
let base = total_bytes_in_memory();
track_bytes_add(1000);
assert_eq!(total_bytes_in_memory(), base + 1000);
track_bytes_sub(600);
assert_eq!(total_bytes_in_memory(), base + 400);
track_bytes_sub(400);
assert_eq!(total_bytes_in_memory(), base);
let before_sat = total_bytes_in_memory();
track_bytes_sub(before_sat + 1);
assert_eq!(total_bytes_in_memory(), 0);
track_bytes_add(before_sat);
}
#[test]
fn test_page_disk_counter() {
{
let base = pages_on_disk();
track_page_spooled();
track_page_spooled();
assert_eq!(pages_on_disk(), base + 2);
track_page_unspooled();
assert_eq!(pages_on_disk(), base + 1);
track_page_unspooled();
assert_eq!(pages_on_disk(), base);
}
}
#[test]
fn test_should_spool_decision() {
assert!(!should_spool(100));
assert!(!should_spool(spool_min_size()));
assert!(!should_spool(200 * 1024)); assert!(!should_spool(5 * 1024 * 1024)); assert!(!should_spool(10 * 1024 * 1024));
assert!(should_spool(base_per_page_threshold() + 1));
}
#[test]
fn test_spool_write_read_delete() {
let dir = std::env::temp_dir().join("spider_spool_test_rw");
let _ = std::fs::create_dir_all(&dir);
let path = dir.join("test.sphtml");
let data = b"<html><body>hello</body></html>";
spool_write(&path, data).unwrap();
let read_back = spool_read(&path).unwrap();
assert_eq!(&read_back, data);
let bytes = spool_read_bytes(&path).unwrap();
assert_eq!(&bytes[..], data);
spool_delete(&path);
assert!(!path.exists());
spool_delete(&path);
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn test_spool_read_nonexistent() {
let path = std::env::temp_dir().join("spider_spool_does_not_exist.sphtml");
assert!(spool_read(&path).is_err());
assert!(spool_read_bytes(&path).is_err());
}
#[test]
fn test_spool_stream_chunks() {
let dir = std::env::temp_dir().join("spider_spool_stream_test2");
let _ = std::fs::create_dir_all(&dir);
let path = dir.join("stream.sphtml");
let data = b"abcdefghijklmnopqrstuvwxyz";
spool_write(&path, data).unwrap();
let mut collected = Vec::new();
let total = spool_stream_chunks(&path, 10, |chunk| {
collected.extend_from_slice(chunk);
true
})
.unwrap();
assert_eq!(collected, data);
assert_eq!(total, data.len());
spool_delete(&path);
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn test_spool_stream_early_stop() {
let dir = std::env::temp_dir().join("spider_spool_stream_stop");
let _ = std::fs::create_dir_all(&dir);
let path = dir.join("stop.sphtml");
let data = vec![0u8; 100];
spool_write(&path, &data).unwrap();
let mut count = 0usize;
spool_stream_chunks(&path, 10, |_| {
count += 1;
count < 3 })
.unwrap();
assert_eq!(count, 3);
spool_delete(&path);
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn test_spool_stream_nonexistent() {
let path = std::env::temp_dir().join("spider_spool_no_exist.sphtml");
let result = spool_stream_chunks(&path, 10, |_| true);
assert!(result.is_err());
}
#[test]
fn test_next_spool_path_unique() {
let p1 = next_spool_path();
let p2 = next_spool_path();
let p3 = next_spool_path();
assert_ne!(p1, p2);
assert_ne!(p2, p3);
assert_eq!(p1.extension().unwrap(), "sphtml");
}
#[test]
fn test_spool_dir_is_stable() {
let d1 = spool_dir();
let d2 = spool_dir();
assert_eq!(d1, d2);
}
#[test]
fn test_spool_empty_data() {
let path = next_spool_path();
spool_write(&path, b"").unwrap();
let read_back = spool_read(&path).unwrap();
assert!(read_back.is_empty());
let mut chunks = 0;
spool_stream_chunks(&path, 10, |_| {
chunks += 1;
true
})
.unwrap();
assert_eq!(chunks, 0, "empty file should produce zero chunks");
spool_delete(&path);
}
#[test]
fn test_spool_large_data_stream() {
let size = 1024 * 1024;
let data: Vec<u8> = (0..size).map(|i| (i % 256) as u8).collect();
let path = next_spool_path();
spool_write(&path, &data).unwrap();
let mut collected = Vec::with_capacity(size);
let total = spool_stream_chunks(&path, 65536, |chunk| {
collected.extend_from_slice(chunk);
true
})
.unwrap();
assert_eq!(total, size);
assert_eq!(collected, data);
spool_delete(&path);
}
}