infino 0.1.0

A fast retrieval engine that stores data on object storage and runs SQL, full-text search, and vector search over it from a single system — search-on-Parquet.
Documentation
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Infino Authors

//! `MADV_DONTNEED` sweep thread.
//!
//! Covers:
//! - sweep_once() advises mmap'd entries that have idled past
//!   `mmap_cold_threshold_secs`
//! - reads remain bit-correct after MADV_DONTNEED (read-only
//!   mappings re-fault from disk; pages may have been
//!   reclaimed but data is identical)
//! - sweep doesn't crash the reader (the FTS query path still
//!   works post-sweep)
//! - background thread starts when `mmap_cold_threshold_secs > 0`
//!   and runs at the configured cadence
//! - threshold=0 disables the sweep thread entirely
//! - in-memory-bytes-backed entries (hybrid foreground,
//!   not yet finalized) are skipped — only mmap'd entries
//!   participate in the sweep

#![deny(clippy::unwrap_used)]

use std::sync::Arc;

use arrow_array::{LargeStringArray, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use bytes::Bytes;
use infino::{
    superfile::{
        builder::{BuilderOptions, FtsConfig, SuperfileBuilder},
        fts::reader::BoolMode,
    },
    supertable::{
        SuperfileUri,
        reader_cache::{ColdFetchMode, DiskCacheConfig, DiskCacheStore, LruPolicy},
        storage::{LocalFsStorageProvider, StorageProvider},
    },
    test_helpers::{decimal128_ids, default_tokenizer},
};
use tempfile::TempDir;

// ============================================================
// Fixtures.
// ============================================================

/// Decimal128 precision / scale for the `doc_id` column.
const ID_DECIMAL_PRECISION: u8 = 38;
const ID_DECIMAL_SCALE: i8 = 0;
/// Disk-cache budget (1 GiB) for the sweep tests.
const DISK_CACHE_BUDGET_BYTES: u64 = 1 << 30;
/// Parallel cold-fetch streams.
const COLD_FETCH_STREAMS: usize = 4;
/// Small cold-fetch chunk to force multi-range fetches.
const COLD_FETCH_CHUNK_BYTES_SMALL: u64 = 64;
/// Finalize-poll deadline (ms) when waiting for the post-commit sweep.
const SWEEP_FINALIZE_POLL_TIMEOUT_MS: u64 = 2_000;
/// Poll interval (ms) inside the finalize wait loop.
const SWEEP_POLL_INTERVAL_MS: u64 = 10;
/// Short post-cold sleep to let background work settle.
const POST_COLD_SLEEP_MS: u64 = 50;
/// Idle threshold + sweep interval (seconds) for the skip-fresh test.
const SWEEP_IDLE_THRESHOLD_SECS: u64 = 3600;
/// Sweep interval (seconds) for the background-thread-disabled test.
const SWEEP_INTERVAL_ONE_SEC: u64 = 1;
/// Wait (ms) proving no background ticks fire when disabled.
const SWEEP_DISABLED_WAIT_MS: u64 = 1500;
/// BM25 top-k for the post-sweep query.
const FTS_TOP_K: usize = 10;

fn build_test_bytes() -> Bytes {
    let schema = Arc::new(Schema::new(vec![
        Field::new(
            "doc_id",
            DataType::Decimal128(ID_DECIMAL_PRECISION, ID_DECIMAL_SCALE),
            false,
        ),
        Field::new("title", DataType::LargeUtf8, false),
    ]));
    let opts = BuilderOptions::new(
        schema.clone(),
        "doc_id",
        vec![FtsConfig {
            column: "title".into(),
        }],
        vec![],
        Some(default_tokenizer()),
    );
    let mut b = SuperfileBuilder::new(opts).expect("builder");
    let ids = decimal128_ids(vec![1u64, 2, 3]);
    let titles = LargeStringArray::from(vec![
        "alpha bravo special",
        "charlie delta",
        "echo special foxtrot",
    ]);
    let batch = RecordBatch::try_new(schema, vec![Arc::new(ids), Arc::new(titles)]).expect("batch");
    b.add_batch(&batch, &[]).expect("add_batch");
    Bytes::from(b.finish().expect("finish"))
}

async fn seed(storage: &dyn StorageProvider, uri: SuperfileUri, bytes: Bytes) {
    let path = uri.storage_path();
    storage.put_atomic(&path, bytes).await.expect("seed");
}

fn cache_with_threshold(
    storage: Arc<dyn StorageProvider>,
    threshold_secs: u64,
    sweep_interval_secs: u64,
) -> (TempDir, Arc<DiskCacheStore>) {
    let dir = TempDir::new().expect("tempdir");
    let cfg = DiskCacheConfig {
        cache_root: dir.path().to_path_buf(),
        disk_budget_bytes: DISK_CACHE_BUDGET_BYTES,
        cold_fetch_mode: ColdFetchMode::HybridWithPrefetch,
        cold_fetch_streams: COLD_FETCH_STREAMS,
        cold_fetch_chunk_bytes: COLD_FETCH_CHUNK_BYTES_SMALL,
        mmap_cold_threshold_secs: threshold_secs,
        mmap_sweep_interval_secs: sweep_interval_secs,
        eviction: Box::new(LruPolicy::new()),
        verify_crc_on_open: true,
        ..Default::default()
    };
    let store = DiskCacheStore::new_unpinned(storage, cfg).expect("store");
    (dir, store)
}

/// Poll `sweep_once()` until it advises at least one entry, or
/// until `timeout_ms` elapses.
///
/// The hybrid cold-fetch path inserts an in-memory entry first
/// and a background task later swaps it for the mmap-backed
/// entry. Under heavy parallel test load the swap may take well
/// over 50 ms — long enough that a fixed sleep races the
/// finalizer. Polling decouples the assertion from scheduler
/// jitter while still failing loudly if the entry is never
/// finalized (real regression).
///
/// `sweep_once()` only increments `n_madvise_calls` when it
/// actually advises an entry, so iterations that return 0 do
/// not perturb the cache stats — the final
/// `n_madvise_calls == 1` invariant still holds.
async fn sweep_once_after_finalize(cache: &Arc<DiskCacheStore>, timeout_ms: u64) -> u64 {
    let deadline = std::time::Instant::now() + std::time::Duration::from_millis(timeout_ms);
    loop {
        let n = cache.sweep_once();
        if n > 0 {
            return n;
        }
        if std::time::Instant::now() >= deadline {
            return n;
        }
        tokio::time::sleep(std::time::Duration::from_millis(SWEEP_POLL_INTERVAL_MS)).await;
    }
}

// ============================================================
// Tests.
// ============================================================

#[tokio::test]
async fn sweep_once_advises_mmapped_entries_when_threshold_is_zero() {
    // threshold=0 → every mmap'd entry is "cold" (now >= 0
    // microseconds since last access). The sweep returns
    // n_advised == n entries.
    let store_dir = TempDir::new().expect("storage");
    let local: Arc<dyn StorageProvider> =
        Arc::new(LocalFsStorageProvider::new(store_dir.path()).expect("local"));
    let bytes = build_test_bytes();
    let uri = SuperfileUri::new_v4();
    seed(&*local, uri, bytes).await;

    let (_d, cache) = cache_with_threshold(local, 0, 0);
    let _reader = cache.reader(&uri).await.expect("cold");

    // Poll until the background finalizer has swapped the
    // in-memory entry for the mmap-backed one — sweep only
    // acts on mmap'd entries. threshold=0 means the sweep
    // thread doesn't start automatically; drive it explicitly.
    let n_advised = sweep_once_after_finalize(&cache, SWEEP_FINALIZE_POLL_TIMEOUT_MS).await;
    assert_eq!(
        n_advised, 1,
        "threshold=0 ⇒ every mmap'd entry advised; got {n_advised}"
    );
    let stats = cache.stats();
    assert_eq!(stats.n_madvise_calls, 1);
}

#[tokio::test]
async fn data_remains_correct_after_madv_dontneed() {
    // MADV_DONTNEED on read-only mmap is safe: dropped pages
    // re-fault from the backing file on next access; data is
    // bit-identical. Verify via an FTS query that survives a
    // sweep.
    let store_dir = TempDir::new().expect("storage");
    let local: Arc<dyn StorageProvider> =
        Arc::new(LocalFsStorageProvider::new(store_dir.path()).expect("local"));
    let bytes = build_test_bytes();
    let uri = SuperfileUri::new_v4();
    seed(&*local, uri, bytes).await;

    let (_d, cache) = cache_with_threshold(local, 0, 0);
    let _r = cache.reader(&uri).await.expect("cold");

    // Poll for finalize → sweep; pages should now be advised
    // as DontNeed once the mmap-backed entry replaces the
    // in-memory one.
    let n_advised = sweep_once_after_finalize(&cache, SWEEP_FINALIZE_POLL_TIMEOUT_MS).await;
    assert!(n_advised >= 1, "sweep should advise at least one entry");

    // Acquire a fresh reader handle from the cache. The mmap
    // is still valid; the pages just need to re-fault.
    let reader = cache.reader(&uri).await.expect("warm after sweep");
    let fts = reader.fts().expect("fts");
    let hits = fts
        .search("title", &["special"], FTS_TOP_K, BoolMode::Or)
        .await
        .expect("bm25 after MADV_DONTNEED");
    assert_eq!(
        hits.len(),
        2,
        "two docs contain 'special'; data must be bit-correct after sweep"
    );
}

#[tokio::test]
async fn recent_access_skipped_by_sweep_when_threshold_nonzero() {
    // threshold=3600s; the entry was just accessed → idle <
    // threshold → sweep skips it.
    let store_dir = TempDir::new().expect("storage");
    let local: Arc<dyn StorageProvider> =
        Arc::new(LocalFsStorageProvider::new(store_dir.path()).expect("local"));
    let bytes = build_test_bytes();
    let uri = SuperfileUri::new_v4();
    seed(&*local, uri, bytes).await;

    // Pick a long threshold + a long cadence (1h) so the
    // background thread doesn't tick during the test.
    let (_d, cache) =
        cache_with_threshold(local, SWEEP_IDLE_THRESHOLD_SECS, SWEEP_IDLE_THRESHOLD_SECS);
    let _r = cache.reader(&uri).await.expect("cold");
    tokio::time::sleep(std::time::Duration::from_millis(POST_COLD_SLEEP_MS)).await;

    // Drive sweep explicitly. Entry is fresh → not advised.
    let n_advised = cache.sweep_once();
    assert_eq!(
        n_advised, 0,
        "fresh entry must not be advised at long threshold; got {n_advised}"
    );
    assert_eq!(cache.stats().n_madvise_calls, 0);
}

#[tokio::test]
async fn in_memory_entries_not_yet_mmapped_are_skipped() {
    // The hybrid path inserts an in-memory entry first,
    // then the background finalizer swaps it for mmap. If
    // we sweep BEFORE finalize runs, the in-memory entry
    // has `mmap: None` and the sweep skips it (it has no
    // mmap to advise).
    //
    // We approximate this by checking that
    // `sweep_once()`'s return — n entries with mmap that
    // are idle — is 0 if all entries are still in their
    // in-memory state. With our test harness we can't
    // perfectly time this, so instead we test the post-
    // finalize state has the expected n_advised count.
    let store_dir = TempDir::new().expect("storage");
    let local: Arc<dyn StorageProvider> =
        Arc::new(LocalFsStorageProvider::new(store_dir.path()).expect("local"));
    let bytes = build_test_bytes();
    let uri = SuperfileUri::new_v4();
    seed(&*local, uri, bytes).await;

    let (_d, cache) = cache_with_threshold(local, 0, 0);
    let _r = cache.reader(&uri).await.expect("cold");

    // Immediately sweep — finalize hasn't run yet, entry is
    // in-memory. Sweep should advise 0.
    let n_immediate = cache.sweep_once();
    // We can't always guarantee finalize hasn't run yet
    // (depends on scheduler), so this is a loose bound.
    // The real assertion is below.
    assert!(
        n_immediate <= 1,
        "sweep advised more than expected; got {n_immediate}"
    );

    // Now poll for the finalizer + sweep again. The entry is
    // mmap-backed; threshold=0 ⇒ advised.
    let n_after_finalize = sweep_once_after_finalize(&cache, SWEEP_FINALIZE_POLL_TIMEOUT_MS).await;
    assert_eq!(
        n_after_finalize, 1,
        "after finalize, the mmap'd entry must be advised; got {n_after_finalize}"
    );
}

#[tokio::test]
async fn threshold_zero_disables_background_sweep_thread() {
    // mmap_cold_threshold_secs == 0 → no background thread
    // spawned. Tests for "thread runs in background" are
    // unreliable wall-clock-wise; we verify the negative
    // case (no thread) by ensuring stats.n_madvise_calls
    // stays 0 over an interval that would otherwise have
    // included a sweep tick.
    let store_dir = TempDir::new().expect("storage");
    let local: Arc<dyn StorageProvider> =
        Arc::new(LocalFsStorageProvider::new(store_dir.path()).expect("local"));
    let bytes = build_test_bytes();
    let uri = SuperfileUri::new_v4();
    seed(&*local, uri, bytes).await;

    let (_d, cache) = cache_with_threshold(local, 0, SWEEP_INTERVAL_ONE_SEC);
    let _r = cache.reader(&uri).await.expect("cold");
    tokio::time::sleep(std::time::Duration::from_millis(POST_COLD_SLEEP_MS)).await;

    // Wait 1.5× the sweep interval — if the thread had
    // spawned, it would have ticked twice by now.
    tokio::time::sleep(std::time::Duration::from_millis(SWEEP_DISABLED_WAIT_MS)).await;
    let stats = cache.stats();
    assert_eq!(
        stats.n_madvise_calls, 0,
        "threshold=0 must disable the background sweep thread"
    );
}