use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicI8, AtomicU64, AtomicUsize, Ordering};
use std::sync::OnceLock;
static TOTAL_HTML_BYTES_IN_MEMORY: AtomicUsize = AtomicUsize::new(0);
static PAGES_ON_DISK: AtomicUsize = AtomicUsize::new(0);
static SPOOL_FILE_COUNTER: AtomicU64 = AtomicU64::new(0);
static CACHED_MEM_STATE: AtomicI8 = AtomicI8::new(0);
static CLEANUP_TX: OnceLock<tokio::sync::mpsc::UnboundedSender<PathBuf>> = OnceLock::new();
fn cleanup_sender() -> &'static tokio::sync::mpsc::UnboundedSender<PathBuf> {
CLEANUP_TX.get_or_init(|| {
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<PathBuf>();
std::thread::Builder::new()
.name("spider-spool-cleanup".into())
.spawn(move || {
while let Some(path) = rx.blocking_recv() {
let _ = std::fs::remove_file(&path);
}
})
.expect("failed to spawn spool cleanup thread");
tx
})
}
#[inline]
pub fn queue_spool_delete(path: PathBuf) {
let _ = cleanup_sender().send(path);
}
#[cfg(test)]
pub fn flush_cleanup() {
let marker = spool_dir().join(format!(
".flush_{}",
SPOOL_FILE_COUNTER.fetch_add(1, Ordering::Relaxed)
));
let _ = std::fs::write(&marker, b"");
let _ = cleanup_sender().send(marker.clone());
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
while marker.exists() && std::time::Instant::now() < deadline {
std::thread::yield_now();
}
}
fn spool_min_size() -> usize {
static VAL: OnceLock<usize> = OnceLock::new();
*VAL.get_or_init(|| {
std::env::var("SPIDER_HTML_SPOOL_MIN_SIZE")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(64 * 1024) })
}
static SPOOL_DIR: OnceLock<SpoolDirHandle> = OnceLock::new();
struct SpoolDirHandle {
_dir: tempfile::TempDir,
path: PathBuf,
}
pub struct WebsiteSpoolDir {
owned: Option<tempfile::TempDir>,
path: PathBuf,
}
impl std::fmt::Debug for WebsiteSpoolDir {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("WebsiteSpoolDir")
.field("path", &self.path)
.field("owned", &self.owned.is_some())
.finish()
}
}
impl WebsiteSpoolDir {
#[inline]
pub fn new_or_shared() -> Self {
match tempfile::Builder::new()
.prefix("spider_website_")
.tempdir_in(spool_dir())
{
Ok(td) => {
let path = td.path().to_path_buf();
Self {
owned: Some(td),
path,
}
}
Err(_) => Self {
owned: None,
path: spool_dir().to_path_buf(),
},
}
}
#[inline]
pub fn path(&self) -> &Path {
&self.path
}
#[inline]
pub fn next_path(&self) -> PathBuf {
let id = SPOOL_FILE_COUNTER.fetch_add(1, Ordering::Relaxed);
self.path.join(format!("{id}.sphtml"))
}
}
tokio::task_local! {
pub static WEBSITE_SPOOL_DIR: std::sync::Arc<WebsiteSpoolDir>;
}
#[inline]
pub fn current_website_spool_dir() -> Option<std::sync::Arc<WebsiteSpoolDir>> {
WEBSITE_SPOOL_DIR.try_with(|d| d.clone()).ok()
}
fn base_memory_budget() -> usize {
static VAL: OnceLock<usize> = OnceLock::new();
*VAL.get_or_init(|| {
std::env::var("SPIDER_HTML_MEMORY_BUDGET")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(2 * 1024 * 1024 * 1024) })
}
fn base_per_page_threshold() -> usize {
static VAL: OnceLock<usize> = OnceLock::new();
*VAL.get_or_init(|| {
std::env::var("SPIDER_HTML_PAGE_SPOOL_SIZE")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(80 * 1024 * 1024) })
}
#[inline]
pub fn track_bytes_add(n: usize) {
TOTAL_HTML_BYTES_IN_MEMORY.fetch_add(n, Ordering::Relaxed);
}
#[inline]
pub fn track_bytes_sub(n: usize) {
let _ = TOTAL_HTML_BYTES_IN_MEMORY.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |cur| {
Some(cur.saturating_sub(n))
});
}
#[inline]
pub fn total_bytes_in_memory() -> usize {
TOTAL_HTML_BYTES_IN_MEMORY.load(Ordering::Relaxed)
}
#[inline]
pub fn track_page_spooled() {
PAGES_ON_DISK.fetch_add(1, Ordering::Relaxed);
}
#[inline]
pub fn track_page_unspooled() {
let _ = PAGES_ON_DISK.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |cur| {
Some(cur.saturating_sub(1))
});
}
#[inline]
pub fn pages_on_disk() -> usize {
PAGES_ON_DISK.load(Ordering::Relaxed)
}
#[inline]
pub fn refresh_cached_mem_state() {
CACHED_MEM_STATE.store(
crate::utils::detect_system::get_process_memory_state_sync(),
Ordering::Relaxed,
);
}
#[inline]
pub fn should_spool(html_len: usize) -> bool {
if html_len <= spool_min_size() {
return false;
}
let threshold = base_per_page_threshold();
let mem_state = CACHED_MEM_STATE.load(Ordering::Relaxed);
match mem_state {
s if s >= 2 => return true,
s if s >= 1 => {
if html_len > threshold / 4 {
return true;
}
let current = total_bytes_in_memory();
if current.saturating_add(html_len) > base_memory_budget() {
return true;
}
}
_ => {
if html_len > threshold {
return true;
}
}
}
false
}
pub fn spool_dir() -> &'static Path {
&SPOOL_DIR
.get_or_init(|| {
if let Ok(custom) = std::env::var("SPIDER_HTML_SPOOL_DIR") {
let dir = PathBuf::from(&custom);
let _ = std::fs::create_dir_all(&dir);
match tempfile::Builder::new()
.prefix("spider_html_")
.tempdir_in(&dir)
{
Ok(td) => {
let path = td.path().to_path_buf();
return SpoolDirHandle { _dir: td, path };
}
Err(_) => {
return SpoolDirHandle {
_dir: tempfile::Builder::new()
.prefix("spider_html_fallback_")
.tempdir()
.expect("failed to create temp dir"),
path: dir,
};
}
}
}
let td = tempfile::Builder::new()
.prefix("spider_html_")
.tempdir()
.expect("failed to create temp dir for HTML spool");
let path = td.path().to_path_buf();
SpoolDirHandle { _dir: td, path }
})
.path
}
pub fn next_spool_path() -> PathBuf {
if let Some(dir) = current_website_spool_dir() {
return dir.next_path();
}
let id = SPOOL_FILE_COUNTER.fetch_add(1, Ordering::Relaxed);
spool_dir().join(format!("{id}.sphtml"))
}
pub fn spool_write(path: &Path, data: &[u8]) -> std::io::Result<()> {
std::fs::write(path, data)
}
pub fn spool_read(path: &Path) -> std::io::Result<Vec<u8>> {
std::fs::read(path)
}
pub fn spool_read_bytes(path: &Path) -> std::io::Result<bytes::Bytes> {
std::fs::read(path).map(bytes::Bytes::from)
}
pub fn spool_delete(path: &Path) {
let _ = std::fs::remove_file(path);
}
pub async fn spool_read_bytes_async(path: std::path::PathBuf) -> std::io::Result<bytes::Bytes> {
crate::utils::uring_fs::read_file(path.display().to_string())
.await
.map(bytes::Bytes::from)
}
pub async fn spool_read_async(path: std::path::PathBuf) -> std::io::Result<Vec<u8>> {
crate::utils::uring_fs::read_file(path.display().to_string()).await
}
pub async fn spool_write_async(path: &Path, data: &[u8]) -> std::io::Result<()> {
crate::utils::uring_fs::write_file(path.display().to_string(), data.to_vec()).await
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct SpoolVitals {
pub byte_len: usize,
pub is_valid_utf8: bool,
pub binary_file: bool,
pub is_xml: bool,
}
pub async fn spool_write_streaming_vitals(
path: &Path,
data: &[u8],
) -> std::io::Result<SpoolVitals> {
use tokio::io::AsyncWriteExt;
const CHUNK: usize = 64 * 1024;
let byte_len = data.len();
if byte_len > spool_max_write_bytes() {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"spool write would exceed SPIDER_HTML_SPOOL_MAX_BYTES",
));
}
let head = &data[..data.len().min(16)];
let binary_file = auto_encoder::is_binary_file(head);
let is_xml = head.starts_with(b"<?xml");
let file = tokio::fs::File::create(path).await?;
let mut writer = tokio::io::BufWriter::with_capacity(CHUNK, file);
let mut is_valid_utf8 = true;
let mut carry: [u8; 4] = [0; 4];
let mut carry_len: usize = 0;
let mut scratch: Vec<u8> = Vec::new();
for chunk in data.chunks(CHUNK) {
writer.write_all(chunk).await?;
if !is_valid_utf8 {
continue;
}
let to_validate: &[u8] = if carry_len == 0 {
chunk
} else {
scratch.clear();
scratch.reserve(carry_len + chunk.len());
scratch.extend_from_slice(&carry[..carry_len]);
scratch.extend_from_slice(chunk);
&scratch[..]
};
match simdutf8::compat::from_utf8(to_validate) {
Ok(_) => {
carry_len = 0;
}
Err(e) => {
if e.error_len().is_some() {
is_valid_utf8 = false;
continue;
}
let trailing = &to_validate[e.valid_up_to()..];
let keep = trailing.len().min(carry.len());
let mut tmp: [u8; 4] = [0; 4];
tmp[..keep].copy_from_slice(&trailing[trailing.len() - keep..]);
carry[..keep].copy_from_slice(&tmp[..keep]);
carry_len = keep;
}
}
}
writer.flush().await?;
let _file = writer.into_inner();
if carry_len > 0 {
is_valid_utf8 = false;
}
Ok(SpoolVitals {
byte_len,
is_valid_utf8,
binary_file,
is_xml,
})
}
pub const SPOOL_HEAD_TAIL_CAP: usize = 256;
pub fn spool_max_write_bytes() -> usize {
static VAL: OnceLock<usize> = OnceLock::new();
*VAL.get_or_init(|| {
const DEFAULT: usize = 1024 * 1024 * 1024; const HARD_CEILING: usize = 4 * 1024 * 1024 * 1024; std::env::var("SPIDER_HTML_SPOOL_MAX_BYTES")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.map(|n| n.min(HARD_CEILING))
.unwrap_or(DEFAULT)
})
}
pub const SPOOL_SIGNATURE_BUFFER_CAP: usize = 16 * 1024 * 1024;
#[derive(Debug, Clone, Default)]
pub struct SpooledContent {
pub path: std::path::PathBuf,
pub vitals: SpoolVitals,
pub head: bytes::Bytes,
pub tail: bytes::Bytes,
pub signature: Option<u64>,
}
pub struct StreamingVitalsSpoolWriter {
writer: tokio::io::BufWriter<tokio::fs::File>,
byte_len: usize,
is_valid_utf8: bool,
binary_file: bool,
is_xml: bool,
header_seen: bool,
carry: [u8; 4],
carry_len: usize,
scratch: Vec<u8>,
head: Vec<u8>,
tail_ring: Vec<u8>,
tail_head: usize,
tail_fed: usize,
}
impl StreamingVitalsSpoolWriter {
const CHUNK: usize = 64 * 1024;
pub async fn new(path: &Path) -> std::io::Result<Self> {
let file = tokio::fs::File::create(path).await?;
let writer = tokio::io::BufWriter::with_capacity(Self::CHUNK, file);
Ok(Self {
writer,
byte_len: 0,
is_valid_utf8: true,
binary_file: false,
is_xml: false,
header_seen: false,
carry: [0; 4],
carry_len: 0,
scratch: Vec::new(),
head: Vec::with_capacity(SPOOL_HEAD_TAIL_CAP),
tail_ring: Vec::with_capacity(SPOOL_HEAD_TAIL_CAP),
tail_head: 0,
tail_fed: 0,
})
}
pub async fn write_chunk(&mut self, chunk: &[u8]) -> std::io::Result<()> {
use tokio::io::AsyncWriteExt;
if chunk.is_empty() {
return Ok(());
}
let projected = self.byte_len.saturating_add(chunk.len());
if projected > spool_max_write_bytes() {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"spool write would exceed SPIDER_HTML_SPOOL_MAX_BYTES",
));
}
self.writer.write_all(chunk).await?;
self.byte_len = projected;
if !self.header_seen {
let head_sample_len = chunk.len().min(16);
let head_sample = &chunk[..head_sample_len];
self.binary_file = auto_encoder::is_binary_file(head_sample);
self.is_xml = head_sample.starts_with(b"<?xml");
self.header_seen = true;
}
if self.head.len() < SPOOL_HEAD_TAIL_CAP {
let remaining = SPOOL_HEAD_TAIL_CAP - self.head.len();
let take = chunk.len().min(remaining);
self.head.extend_from_slice(&chunk[..take]);
}
let cap = SPOOL_HEAD_TAIL_CAP;
if self.tail_fed == 0 && chunk.len() <= cap {
self.tail_ring.clear();
self.tail_ring.extend_from_slice(chunk);
self.tail_head = self.tail_ring.len() % cap;
self.tail_fed = chunk.len();
} else if chunk.len() >= cap {
self.tail_ring.clear();
self.tail_ring
.extend_from_slice(&chunk[chunk.len() - cap..]);
self.tail_head = 0;
self.tail_fed = self.tail_fed.saturating_add(chunk.len());
} else {
if self.tail_ring.len() < cap {
let needed = cap - self.tail_ring.len();
let pad = chunk.len().min(needed);
self.tail_ring.extend_from_slice(&chunk[..pad]);
let rest = &chunk[pad..];
if !rest.is_empty() {
let ring_cap = self.tail_ring.len();
for (i, b) in rest.iter().enumerate() {
self.tail_ring[i % ring_cap] = *b;
}
self.tail_head = rest.len() % ring_cap;
} else {
self.tail_head = self.tail_ring.len() % cap;
}
} else {
for b in chunk {
self.tail_ring[self.tail_head] = *b;
self.tail_head += 1;
if self.tail_head == cap {
self.tail_head = 0;
}
}
}
self.tail_fed = self.tail_fed.saturating_add(chunk.len());
}
if !self.is_valid_utf8 {
return Ok(());
}
let to_validate: &[u8] = if self.carry_len == 0 {
chunk
} else {
self.scratch.clear();
self.scratch.reserve(self.carry_len + chunk.len());
self.scratch
.extend_from_slice(&self.carry[..self.carry_len]);
self.scratch.extend_from_slice(chunk);
&self.scratch[..]
};
match simdutf8::compat::from_utf8(to_validate) {
Ok(_) => {
self.carry_len = 0;
}
Err(e) => {
if e.error_len().is_some() {
self.is_valid_utf8 = false;
} else {
let trailing = &to_validate[e.valid_up_to()..];
let keep = trailing.len().min(self.carry.len());
let mut tmp: [u8; 4] = [0; 4];
tmp[..keep].copy_from_slice(&trailing[trailing.len() - keep..]);
self.carry[..keep].copy_from_slice(&tmp[..keep]);
self.carry_len = keep;
}
}
}
Ok(())
}
pub async fn finish(mut self) -> std::io::Result<(SpoolVitals, bytes::Bytes, bytes::Bytes)> {
use tokio::io::AsyncWriteExt;
self.writer.flush().await?;
let _file = self.writer.into_inner();
if self.carry_len > 0 {
self.is_valid_utf8 = false;
}
let head = bytes::Bytes::from(self.head);
let tail = if self.tail_fed <= SPOOL_HEAD_TAIL_CAP {
bytes::Bytes::from(self.tail_ring)
} else {
let cap = self.tail_ring.len();
let mut out = Vec::with_capacity(cap);
let head_idx = self.tail_head;
out.extend_from_slice(&self.tail_ring[head_idx..]);
out.extend_from_slice(&self.tail_ring[..head_idx]);
bytes::Bytes::from(out)
};
Ok((
SpoolVitals {
byte_len: self.byte_len,
is_valid_utf8: self.is_valid_utf8,
binary_file: self.binary_file,
is_xml: self.is_xml,
},
head,
tail,
))
}
}
pub async fn spool_stream_chunks_async(
path: std::path::PathBuf,
chunk_size: usize,
cb: impl FnMut(&[u8]) -> bool,
) -> std::io::Result<usize> {
crate::utils::uring_fs::read_file_chunked(path.display().to_string(), chunk_size, cb).await
}
pub fn cleanup_spool_dir() {
if let Some(handle) = SPOOL_DIR.get() {
let _ = std::fs::remove_dir_all(&handle.path);
}
}
pub fn spool_stream_chunks<F>(path: &Path, chunk_size: usize, mut cb: F) -> std::io::Result<usize>
where
F: FnMut(&[u8]) -> bool,
{
use std::io::Read;
let mut file = std::fs::File::open(path)?;
let mut buf = vec![0u8; chunk_size];
let mut total = 0usize;
loop {
let n = file.read(&mut buf)?;
if n == 0 {
break;
}
total = total.saturating_add(n);
if !cb(&buf[..n]) {
break;
}
}
Ok(total)
}
#[cfg(test)]
pub(crate) mod tests {
use super::*;
#[test]
fn test_byte_accounting_saturating() {
let base = total_bytes_in_memory();
track_bytes_add(1000);
assert_eq!(total_bytes_in_memory(), base + 1000);
track_bytes_sub(600);
assert_eq!(total_bytes_in_memory(), base + 400);
track_bytes_sub(400);
assert_eq!(total_bytes_in_memory(), base);
let before_sat = total_bytes_in_memory();
track_bytes_sub(before_sat + 1);
assert_eq!(total_bytes_in_memory(), 0);
track_bytes_add(before_sat);
}
#[test]
fn test_page_disk_counter() {
{
let base = pages_on_disk();
track_page_spooled();
track_page_spooled();
assert_eq!(pages_on_disk(), base + 2);
track_page_unspooled();
assert_eq!(pages_on_disk(), base + 1);
track_page_unspooled();
assert_eq!(pages_on_disk(), base);
}
}
#[test]
fn test_should_spool_decision() {
assert!(!should_spool(100));
assert!(!should_spool(spool_min_size()));
assert!(!should_spool(200 * 1024)); assert!(!should_spool(5 * 1024 * 1024)); assert!(!should_spool(10 * 1024 * 1024));
assert!(should_spool(base_per_page_threshold() + 1));
}
#[test]
fn test_spool_write_read_delete() {
let dir = std::env::temp_dir().join("spider_spool_test_rw");
let _ = std::fs::create_dir_all(&dir);
let path = dir.join("test.sphtml");
let data = b"<html><body>hello</body></html>";
spool_write(&path, data).unwrap();
let read_back = spool_read(&path).unwrap();
assert_eq!(&read_back, data);
let bytes = spool_read_bytes(&path).unwrap();
assert_eq!(&bytes[..], data);
spool_delete(&path);
assert!(!path.exists());
spool_delete(&path);
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn test_spool_read_nonexistent() {
let path = std::env::temp_dir().join("spider_spool_does_not_exist.sphtml");
assert!(spool_read(&path).is_err());
assert!(spool_read_bytes(&path).is_err());
}
#[test]
fn test_spool_stream_chunks() {
let dir = std::env::temp_dir().join("spider_spool_stream_test2");
let _ = std::fs::create_dir_all(&dir);
let path = dir.join("stream.sphtml");
let data = b"abcdefghijklmnopqrstuvwxyz";
spool_write(&path, data).unwrap();
let mut collected = Vec::new();
let total = spool_stream_chunks(&path, 10, |chunk| {
collected.extend_from_slice(chunk);
true
})
.unwrap();
assert_eq!(collected, data);
assert_eq!(total, data.len());
spool_delete(&path);
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn test_spool_stream_early_stop() {
let dir = std::env::temp_dir().join("spider_spool_stream_stop");
let _ = std::fs::create_dir_all(&dir);
let path = dir.join("stop.sphtml");
let data = vec![0u8; 100];
spool_write(&path, &data).unwrap();
let mut count = 0usize;
spool_stream_chunks(&path, 10, |_| {
count += 1;
count < 3 })
.unwrap();
assert_eq!(count, 3);
spool_delete(&path);
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn test_spool_stream_nonexistent() {
let path = std::env::temp_dir().join("spider_spool_no_exist.sphtml");
let result = spool_stream_chunks(&path, 10, |_| true);
assert!(result.is_err());
}
#[test]
fn test_next_spool_path_unique() {
let p1 = next_spool_path();
let p2 = next_spool_path();
let p3 = next_spool_path();
assert_ne!(p1, p2);
assert_ne!(p2, p3);
assert_eq!(p1.extension().unwrap(), "sphtml");
}
#[test]
fn test_spool_dir_is_stable() {
let d1 = spool_dir();
let d2 = spool_dir();
assert_eq!(d1, d2);
}
#[test]
fn test_spool_empty_data() {
let path = next_spool_path();
spool_write(&path, b"").unwrap();
let read_back = spool_read(&path).unwrap();
assert!(read_back.is_empty());
let mut chunks = 0;
spool_stream_chunks(&path, 10, |_| {
chunks += 1;
true
})
.unwrap();
assert_eq!(chunks, 0, "empty file should produce zero chunks");
spool_delete(&path);
}
#[test]
fn test_spool_large_data_stream() {
let size = 1024 * 1024;
let data: Vec<u8> = (0..size).map(|i| (i % 256) as u8).collect();
let path = next_spool_path();
spool_write(&path, &data).unwrap();
let mut collected = Vec::with_capacity(size);
let total = spool_stream_chunks(&path, 65536, |chunk| {
collected.extend_from_slice(chunk);
true
})
.unwrap();
assert_eq!(total, size);
assert_eq!(collected, data);
spool_delete(&path);
}
#[tokio::test]
async fn test_spool_streaming_vitals_matches_reference_ascii() {
let data = b"<html><body>simple ascii page</body></html>";
let path = next_spool_path();
let vitals = spool_write_streaming_vitals(&path, data).await.unwrap();
assert_eq!(vitals.byte_len, data.len());
assert!(vitals.is_valid_utf8);
assert!(!vitals.binary_file);
assert!(!vitals.is_xml);
let on_disk = std::fs::read(&path).unwrap();
assert_eq!(on_disk, data);
spool_delete(&path);
}
#[tokio::test]
async fn test_spool_streaming_vitals_utf8_multibyte() {
let mut data: Vec<u8> = Vec::with_capacity(256 * 1024);
for _ in 0..(90 * 1024) {
data.extend_from_slice("€".as_bytes());
}
assert!(simdutf8::basic::from_utf8(&data).is_ok());
let path = next_spool_path();
let vitals = spool_write_streaming_vitals(&path, &data).await.unwrap();
assert_eq!(vitals.byte_len, data.len());
assert!(
vitals.is_valid_utf8,
"multi-byte codepoint spanning chunk boundaries must stay valid"
);
spool_delete(&path);
}
#[tokio::test]
async fn test_spool_streaming_vitals_utf8_invalid() {
let mut data: Vec<u8> = b"<html>valid prefix".to_vec();
data.push(0x80);
data.extend_from_slice(b"</html>");
let path = next_spool_path();
let vitals = spool_write_streaming_vitals(&path, &data).await.unwrap();
assert_eq!(vitals.byte_len, data.len());
assert!(!vitals.is_valid_utf8);
let on_disk = std::fs::read(&path).unwrap();
assert_eq!(on_disk, data);
spool_delete(&path);
}
#[tokio::test]
async fn test_spool_streaming_vitals_xml_header() {
let data = br#"<?xml version="1.0"?><feed/>"#;
let path = next_spool_path();
let vitals = spool_write_streaming_vitals(&path, data).await.unwrap();
assert!(vitals.is_xml);
assert!(vitals.is_valid_utf8);
spool_delete(&path);
}
#[tokio::test]
async fn test_spool_streaming_vitals_empty() {
let path = next_spool_path();
let vitals = spool_write_streaming_vitals(&path, &[]).await.unwrap();
assert_eq!(vitals.byte_len, 0);
assert!(
vitals.is_valid_utf8,
"empty bytes are trivially valid utf-8"
);
assert!(!vitals.binary_file);
assert!(!vitals.is_xml);
spool_delete(&path);
}
#[tokio::test]
async fn test_streaming_writer_matches_single_shot() {
let mut data: Vec<u8> = Vec::with_capacity(200 * 1024);
for i in 0..(200 * 1024) {
data.push((b'a' + (i % 26) as u8) as u8);
}
let ref_path = next_spool_path();
let ref_vitals = spool_write_streaming_vitals(&ref_path, &data)
.await
.unwrap();
spool_delete(&ref_path);
let path = next_spool_path();
let mut w = StreamingVitalsSpoolWriter::new(&path).await.unwrap();
for chunk in data.chunks(7919) {
w.write_chunk(chunk).await.unwrap();
}
let (vitals, head, tail) = w.finish().await.unwrap();
assert_eq!(vitals.byte_len, ref_vitals.byte_len);
assert_eq!(vitals.is_valid_utf8, ref_vitals.is_valid_utf8);
assert_eq!(vitals.binary_file, ref_vitals.binary_file);
assert_eq!(vitals.is_xml, ref_vitals.is_xml);
assert_eq!(head.as_ref(), &data[..SPOOL_HEAD_TAIL_CAP]);
assert_eq!(tail.as_ref(), &data[data.len() - SPOOL_HEAD_TAIL_CAP..]);
let on_disk = std::fs::read(&path).unwrap();
assert_eq!(on_disk, data);
spool_delete(&path);
}
#[tokio::test]
async fn test_streaming_writer_small_head_tail() {
let data = b"<html><body>tiny</body></html>";
let path = next_spool_path();
let mut w = StreamingVitalsSpoolWriter::new(&path).await.unwrap();
w.write_chunk(data).await.unwrap();
let (_, head, tail) = w.finish().await.unwrap();
assert_eq!(head.as_ref(), data.as_slice());
assert_eq!(tail.as_ref(), data.as_slice());
spool_delete(&path);
}
#[test]
fn test_spool_max_write_bytes_hard_ceiling() {
let parsed: usize = "99999999999999999".parse().unwrap_or(0);
assert!(parsed > 4 * 1024 * 1024 * 1024);
let capped = parsed.min(4 * 1024 * 1024 * 1024);
assert_eq!(capped, 4 * 1024 * 1024 * 1024);
}
#[tokio::test]
async fn test_streaming_writer_multibyte_across_boundaries() {
let mut data: Vec<u8> = Vec::with_capacity(90 * 1024 * 3);
for _ in 0..(90 * 1024) {
data.extend_from_slice("€".as_bytes());
}
let path = next_spool_path();
let mut w = StreamingVitalsSpoolWriter::new(&path).await.unwrap();
for chunk in data.chunks(3331) {
w.write_chunk(chunk).await.unwrap();
}
let (vitals, _, _) = w.finish().await.unwrap();
assert!(vitals.is_valid_utf8);
assert_eq!(vitals.byte_len, data.len());
spool_delete(&path);
}
}