#![deny(clippy::unwrap_used)]
use std::sync::Arc;
use arrow_array::{LargeStringArray, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use bytes::Bytes;
use infino::{
superfile::{
builder::{BuilderOptions, FtsConfig, SuperfileBuilder},
fts::reader::BoolMode,
},
supertable::{
SuperfileUri,
reader_cache::{ColdFetchMode, DiskCacheConfig, DiskCacheStore, LruPolicy},
storage::{LocalFsStorageProvider, StorageProvider},
},
test_helpers::{decimal128_ids, default_tokenizer},
};
use tempfile::TempDir;
const ID_DECIMAL_PRECISION: u8 = 38;
const ID_DECIMAL_SCALE: i8 = 0;
const DISK_CACHE_BUDGET_BYTES: u64 = 1 << 30;
const COLD_FETCH_STREAMS: usize = 4;
const COLD_FETCH_CHUNK_BYTES_SMALL: u64 = 64;
const SWEEP_FINALIZE_POLL_TIMEOUT_MS: u64 = 2_000;
const SWEEP_POLL_INTERVAL_MS: u64 = 10;
const POST_COLD_SLEEP_MS: u64 = 50;
const SWEEP_IDLE_THRESHOLD_SECS: u64 = 3600;
const SWEEP_INTERVAL_ONE_SEC: u64 = 1;
const SWEEP_DISABLED_WAIT_MS: u64 = 1500;
const FTS_TOP_K: usize = 10;
fn build_test_bytes() -> Bytes {
let schema = Arc::new(Schema::new(vec![
Field::new(
"doc_id",
DataType::Decimal128(ID_DECIMAL_PRECISION, ID_DECIMAL_SCALE),
false,
),
Field::new("title", DataType::LargeUtf8, false),
]));
let opts = BuilderOptions::new(
schema.clone(),
"doc_id",
vec![FtsConfig {
column: "title".into(),
}],
vec![],
Some(default_tokenizer()),
);
let mut b = SuperfileBuilder::new(opts).expect("builder");
let ids = decimal128_ids(vec![1u64, 2, 3]);
let titles = LargeStringArray::from(vec![
"alpha bravo special",
"charlie delta",
"echo special foxtrot",
]);
let batch = RecordBatch::try_new(schema, vec![Arc::new(ids), Arc::new(titles)]).expect("batch");
b.add_batch(&batch, &[]).expect("add_batch");
Bytes::from(b.finish().expect("finish"))
}
async fn seed(storage: &dyn StorageProvider, uri: SuperfileUri, bytes: Bytes) {
let path = uri.storage_path();
storage.put_atomic(&path, bytes).await.expect("seed");
}
fn cache_with_threshold(
storage: Arc<dyn StorageProvider>,
threshold_secs: u64,
sweep_interval_secs: u64,
) -> (TempDir, Arc<DiskCacheStore>) {
let dir = TempDir::new().expect("tempdir");
let cfg = DiskCacheConfig {
cache_root: dir.path().to_path_buf(),
disk_budget_bytes: DISK_CACHE_BUDGET_BYTES,
cold_fetch_mode: ColdFetchMode::HybridWithPrefetch,
cold_fetch_streams: COLD_FETCH_STREAMS,
cold_fetch_chunk_bytes: COLD_FETCH_CHUNK_BYTES_SMALL,
mmap_cold_threshold_secs: threshold_secs,
mmap_sweep_interval_secs: sweep_interval_secs,
eviction: Box::new(LruPolicy::new()),
verify_crc_on_open: true,
..Default::default()
};
let store = DiskCacheStore::new_unpinned(storage, cfg).expect("store");
(dir, store)
}
async fn sweep_once_after_finalize(cache: &Arc<DiskCacheStore>, timeout_ms: u64) -> u64 {
let deadline = std::time::Instant::now() + std::time::Duration::from_millis(timeout_ms);
loop {
let n = cache.sweep_once();
if n > 0 {
return n;
}
if std::time::Instant::now() >= deadline {
return n;
}
tokio::time::sleep(std::time::Duration::from_millis(SWEEP_POLL_INTERVAL_MS)).await;
}
}
#[tokio::test]
async fn sweep_once_advises_mmapped_entries_when_threshold_is_zero() {
let store_dir = TempDir::new().expect("storage");
let local: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(store_dir.path()).expect("local"));
let bytes = build_test_bytes();
let uri = SuperfileUri::new_v4();
seed(&*local, uri, bytes).await;
let (_d, cache) = cache_with_threshold(local, 0, 0);
let _reader = cache.reader(&uri).await.expect("cold");
let n_advised = sweep_once_after_finalize(&cache, SWEEP_FINALIZE_POLL_TIMEOUT_MS).await;
assert_eq!(
n_advised, 1,
"threshold=0 ⇒ every mmap'd entry advised; got {n_advised}"
);
let stats = cache.stats();
assert_eq!(stats.n_madvise_calls, 1);
}
#[tokio::test]
async fn data_remains_correct_after_madv_dontneed() {
let store_dir = TempDir::new().expect("storage");
let local: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(store_dir.path()).expect("local"));
let bytes = build_test_bytes();
let uri = SuperfileUri::new_v4();
seed(&*local, uri, bytes).await;
let (_d, cache) = cache_with_threshold(local, 0, 0);
let _r = cache.reader(&uri).await.expect("cold");
let n_advised = sweep_once_after_finalize(&cache, SWEEP_FINALIZE_POLL_TIMEOUT_MS).await;
assert!(n_advised >= 1, "sweep should advise at least one entry");
let reader = cache.reader(&uri).await.expect("warm after sweep");
let fts = reader.fts().expect("fts");
let hits = fts
.search("title", &["special"], FTS_TOP_K, BoolMode::Or)
.await
.expect("bm25 after MADV_DONTNEED");
assert_eq!(
hits.len(),
2,
"two docs contain 'special'; data must be bit-correct after sweep"
);
}
#[tokio::test]
async fn recent_access_skipped_by_sweep_when_threshold_nonzero() {
let store_dir = TempDir::new().expect("storage");
let local: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(store_dir.path()).expect("local"));
let bytes = build_test_bytes();
let uri = SuperfileUri::new_v4();
seed(&*local, uri, bytes).await;
let (_d, cache) =
cache_with_threshold(local, SWEEP_IDLE_THRESHOLD_SECS, SWEEP_IDLE_THRESHOLD_SECS);
let _r = cache.reader(&uri).await.expect("cold");
tokio::time::sleep(std::time::Duration::from_millis(POST_COLD_SLEEP_MS)).await;
let n_advised = cache.sweep_once();
assert_eq!(
n_advised, 0,
"fresh entry must not be advised at long threshold; got {n_advised}"
);
assert_eq!(cache.stats().n_madvise_calls, 0);
}
#[tokio::test]
async fn in_memory_entries_not_yet_mmapped_are_skipped() {
let store_dir = TempDir::new().expect("storage");
let local: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(store_dir.path()).expect("local"));
let bytes = build_test_bytes();
let uri = SuperfileUri::new_v4();
seed(&*local, uri, bytes).await;
let (_d, cache) = cache_with_threshold(local, 0, 0);
let _r = cache.reader(&uri).await.expect("cold");
let n_immediate = cache.sweep_once();
assert!(
n_immediate <= 1,
"sweep advised more than expected; got {n_immediate}"
);
let n_after_finalize = sweep_once_after_finalize(&cache, SWEEP_FINALIZE_POLL_TIMEOUT_MS).await;
assert_eq!(
n_after_finalize, 1,
"after finalize, the mmap'd entry must be advised; got {n_after_finalize}"
);
}
#[tokio::test]
async fn threshold_zero_disables_background_sweep_thread() {
let store_dir = TempDir::new().expect("storage");
let local: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(store_dir.path()).expect("local"));
let bytes = build_test_bytes();
let uri = SuperfileUri::new_v4();
seed(&*local, uri, bytes).await;
let (_d, cache) = cache_with_threshold(local, 0, SWEEP_INTERVAL_ONE_SEC);
let _r = cache.reader(&uri).await.expect("cold");
tokio::time::sleep(std::time::Duration::from_millis(POST_COLD_SLEEP_MS)).await;
tokio::time::sleep(std::time::Duration::from_millis(SWEEP_DISABLED_WAIT_MS)).await;
let stats = cache.stats();
assert_eq!(
stats.n_madvise_calls, 0,
"threshold=0 must disable the background sweep thread"
);
}