#![deny(clippy::unwrap_used)]
use std::sync::Arc;
use infino::{
supertable::{
Supertable,
storage::{LocalFsStorageProvider, StorageProvider},
},
test_helpers::{build_title_batch, default_supertable_options},
};
const DISK_CACHE_BUDGET_BYTES: u64 = 1 << 30;
const COLD_FETCH_STREAMS: usize = 4;
const COLD_FETCH_CHUNK_BYTES: u64 = 1 << 20;
use tempfile::TempDir;
#[test]
fn fresh_supertable_returns_empty_stats() {
let st = Supertable::create(default_supertable_options()).expect("create");
let s = st.stats();
assert_eq!(s.manifest_id, 0);
assert_eq!(s.n_superfiles, 0);
assert_eq!(
s.n_manifest_parts, 0,
"fresh in-process supertable has no persisted Manifest"
);
assert_eq!(s.n_manifest_parts_loaded, 0);
assert!(s.process_rss_bytes > 0, "RSS must be non-zero");
}
#[test]
fn stats_track_commits_on_in_process_supertable() {
let st = Supertable::create(default_supertable_options()).expect("create");
{
let mut w = st.writer().expect("writer");
w.append(&build_title_batch(&["alpha"])).expect("append");
w.commit().expect("commit");
}
let s1 = st.stats();
assert_eq!(s1.manifest_id, 1);
assert!(s1.n_superfiles >= 1);
{
let mut w = st.writer().expect("writer");
w.append(&build_title_batch(&["beta"])).expect("append");
w.commit().expect("commit");
}
let s2 = st.stats();
assert_eq!(s2.manifest_id, 2);
assert!(
s2.n_superfiles > s1.n_superfiles,
"commit must grow n_superfiles: {} → {}",
s1.n_superfiles,
s2.n_superfiles
);
assert_eq!(
s2.n_manifest_parts, 0,
"in-process supertable never has a persisted Manifest"
);
}
#[test]
fn stats_show_manifest_parts_when_storage_attached() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let producer =
Supertable::create(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("create");
{
let mut w = producer.writer().expect("writer");
w.append(&build_title_batch(&["initial"])).expect("append");
w.commit().expect("commit");
}
let producer_stats = producer.stats();
assert_eq!(producer_stats.manifest_id, 1);
assert_eq!(
producer_stats.n_manifest_parts, 1,
"post-commit persisted Manifest exists with one part"
);
assert_eq!(
producer_stats.n_manifest_parts_loaded, 1,
"commit path seeds the freshly-written part into the cache"
);
drop(producer);
let consumer =
Supertable::open(default_supertable_options().with_storage(Arc::clone(&storage)))
.expect("open");
let consumer_stats = consumer.stats();
assert_eq!(consumer_stats.manifest_id, 1);
assert_eq!(consumer_stats.n_manifest_parts, 1);
assert_eq!(
consumer_stats.n_manifest_parts_loaded, 1,
"open eager-fetches every part"
);
}
#[test]
fn process_rss_bytes_matches_independent_reading_within_bracket() {
let st = Supertable::create(default_supertable_options()).expect("create");
let read = || {
memory_stats::memory_stats()
.map(|m| m.physical_mem as u64)
.expect("RSS available")
};
let i1 = read();
let s = st.stats().process_rss_bytes;
let i2 = read();
assert!(s > 0, "stats.process_rss_bytes must be non-zero");
assert!(
i1 > 0 && i2 > 0,
"independent RSS readings must be non-zero"
);
const ABS_SLACK_BYTES: u64 = 64 * 1024 * 1024;
let drift = i1.abs_diff(i2);
let slack = drift.saturating_add(ABS_SLACK_BYTES);
let lo = i1.min(i2).saturating_sub(slack);
let hi = i1.max(i2).saturating_add(slack);
assert!(
s >= lo && s <= hi,
"stats.process_rss_bytes={s} outside [{lo}, {hi}] \
(independent reads: {i1}, {i2}; drift={drift}, slack={slack})"
);
}
#[test]
fn repeat_stats_calls_return_fresh_snapshots() {
let st = Supertable::create(default_supertable_options()).expect("create");
let pre = st.stats();
assert_eq!(pre.manifest_id, 0);
{
let mut w = st.writer().expect("writer");
w.append(&build_title_batch(&["something"]))
.expect("append");
w.commit().expect("commit");
}
let post = st.stats();
assert_eq!(post.manifest_id, 1);
assert!(post.n_superfiles > pre.n_superfiles);
}
#[test]
fn stats_without_disk_cache_have_none_cache_counters() {
let st = Supertable::create(default_supertable_options()).expect("create");
let s = st.stats();
assert_eq!(s.n_cold_fetches, None);
assert_eq!(s.n_cache_evictions, None);
assert_eq!(s.n_cache_madvise_calls, None);
assert_eq!(s.n_cache_entries, None);
assert_eq!(s.mmap_resident_bytes, None);
assert_eq!(s.memory_budget_bytes, None);
}
#[test]
fn stats_with_disk_cache_attached_surface_zero_counters_on_fresh_cache() {
use std::collections::HashSet;
use infino::supertable::{
SuperfileUri,
reader_cache::{ColdFetchMode, DiskCacheConfig, DiskCacheStore, LruPolicy},
};
let storage_dir = TempDir::new().expect("storage dir");
let cache_dir = TempDir::new().expect("cache dir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(storage_dir.path()).expect("provider"));
let cfg = DiskCacheConfig {
cache_root: cache_dir.path().to_path_buf(),
disk_budget_bytes: DISK_CACHE_BUDGET_BYTES,
cold_fetch_mode: ColdFetchMode::HybridWithPrefetch,
cold_fetch_streams: COLD_FETCH_STREAMS,
cold_fetch_chunk_bytes: COLD_FETCH_CHUNK_BYTES,
mmap_cold_threshold_secs: 0,
mmap_sweep_interval_secs: 0,
eviction: Box::new(LruPolicy::new()),
verify_crc_on_open: true,
..Default::default()
};
let pinned: Arc<dyn Fn() -> HashSet<SuperfileUri> + Send + Sync> = Arc::new(HashSet::new);
let cache = DiskCacheStore::new(Arc::clone(&storage), cfg, pinned).expect("cache");
let opts = default_supertable_options()
.with_storage(Arc::clone(&storage))
.with_disk_cache(Arc::clone(&cache));
let st = Supertable::create(opts).expect("create");
let s = st.stats();
assert_eq!(s.n_cold_fetches, Some(0), "fresh cache: zero cold fetches");
assert_eq!(s.n_cache_evictions, Some(0));
assert_eq!(s.n_cache_madvise_calls, Some(0));
assert_eq!(s.n_cache_entries, Some(0));
assert!(
s.mmap_resident_bytes.is_some(),
"mmap_resident_bytes surfaces when cache is attached"
);
}