net-mesh 0.23.0

High-performance, schema-agnostic, backend-agnostic event bus
Documentation
//! Retention policy for RedEX (count + size + age).
//!
//! This module is pure logic: given the current index state, the
//! per-entry timestamps, and a config, it returns the number of head
//! entries to evict. The actual segment rewrite happens in
//! `RedexFile::sweep_retention`.

use super::config::RedexFileConfig;
use super::entry::{RedexEntry, REDEX_ENTRY_SIZE};

/// Compute how many head entries to drop so that the retained tail
/// satisfies every active retention policy (`retention_max_events`,
/// `retention_max_bytes`, `retention_max_age_ns`). When multiple are
/// set, the policies AND together — we take the largest drop count
/// so that ALL constraints are satisfied.
///
/// `entries` is the current index in seq order. `timestamps` is a
/// parallel slice of unix-nanos timestamps captured at append time
/// (same length as `entries`). `now_ns` is the current wall clock in
/// unix nanos. Returns a count such that `entries[count..]` is the
/// post-sweep tail.
pub(crate) fn compute_eviction_count(
    entries: &[RedexEntry],
    timestamps: &[u64],
    now_ns: u64,
    cfg: &RedexFileConfig,
) -> usize {
    debug_assert_eq!(
        entries.len(),
        timestamps.len(),
        "timestamps must parallel entries"
    );

    let mut drop = 0usize;

    // Count-based.
    if let Some(max_events) = cfg.retention_max_events {
        let len = entries.len() as u64;
        if len > max_events {
            drop = (len - max_events) as usize;
        }
    }

    // Size-based: walk from the tail back, accumulating bytes; anything
    // beyond the cap is dropped. `idx` here is the forward index — the
    // entry at `entries[idx]`. When it doesn't fit, we drop [0..idx+1).
    //
    // The per-entry size counted against `retention_max_bytes` is the
    // total on-disk / in-memory footprint: 20 bytes of index record
    // plus the heap payload (or 0 for inline, since the payload rides
    // inside the index record). Counting only the payload would make
    // small-payload workloads blow past the cap through index overhead
    // alone.
    if let Some(max_bytes) = cfg.retention_max_bytes {
        let mut retained_bytes: u64 = 0;
        for (idx, e) in entries.iter().enumerate().rev() {
            let size = entry_total_size(e);
            if retained_bytes + size > max_bytes {
                drop = drop.max(idx + 1);
                break;
            }
            retained_bytes += size;
        }
        // If we exited the loop without breaking, all entries fit —
        // size policy drops none.
    }

    // Age-based: drop entries whose timestamp is STRICTLY older
    // than the cutoff. Timestamps are ~monotonic (wall clock)
    // within a single process, so the first entry with
    // `ts >= cutoff` marks the retained head.
    //
    // Pre-fix, the predicate was `ts > cutoff` (drop on
    // `ts <= cutoff`), so an entry with `ts == cutoff` — i.e.
    // an entry exactly `max_age_ns` old — was dropped.
    // Intuitive "max age" semantics retain the boundary entry;
    // breaking change to align the implementation with that
    // intuition. Callers that depended on the off-by-one (e.g.
    // tests asserting "5 of 10 dropped under
    // `max_age = total - 1`") see one fewer drop now.
    if let Some(max_age_ns) = cfg.retention_max_age_ns {
        let cutoff = now_ns.saturating_sub(max_age_ns);
        let mut age_drop = 0;
        for &ts in timestamps.iter() {
            if ts >= cutoff {
                break;
            }
            age_drop += 1;
        }
        drop = drop.max(age_drop);
    }

    drop
}

/// Total on-disk / in-memory size attributable to one entry: the
/// 20-byte index record plus the heap payload, if any. Inline entries
/// carry their payload inside the 20 bytes and contribute nothing
/// extra.
#[inline]
fn entry_total_size(e: &RedexEntry) -> u64 {
    let idx = REDEX_ENTRY_SIZE as u64;
    if e.is_inline() {
        idx
    } else {
        idx + e.payload_len as u64
    }
}

#[cfg(test)]
mod tests {
    use std::time::Duration;

    use super::super::entry::RedexEntry;
    use super::*;

    fn heap_entries(count: usize, each_size: u32) -> Vec<RedexEntry> {
        (0..count)
            .map(|i| RedexEntry::new_heap(i as u64, (i as u32) * each_size, each_size, 0, 0))
            .collect()
    }

    /// Parallel timestamps — all "now" for count/size tests that
    /// don't exercise the age policy.
    fn dummy_timestamps(count: usize) -> Vec<u64> {
        vec![0; count]
    }

    #[test]
    fn test_no_retention_drops_nothing() {
        let entries = heap_entries(100, 16);
        let ts = dummy_timestamps(100);
        let cfg = RedexFileConfig::default();
        assert_eq!(compute_eviction_count(&entries, &ts, 0, &cfg), 0);
    }

    #[test]
    fn test_count_retention() {
        let entries = heap_entries(100, 16);
        let ts = dummy_timestamps(100);
        let cfg = RedexFileConfig::default().with_retention_max_events(40);
        assert_eq!(compute_eviction_count(&entries, &ts, 0, &cfg), 60);
    }

    #[test]
    fn test_size_retention() {
        // Each heap entry costs 20 (index record) + 16 (payload) = 36
        // bytes against the retention_max_bytes budget. 36 × 13 = 468
        // fits in 480; 36 × 14 = 504 does not → keep 13, drop 87.
        let entries = heap_entries(100, 16);
        let ts = dummy_timestamps(100);
        let cfg = RedexFileConfig::default().with_retention_max_bytes(480);
        assert_eq!(compute_eviction_count(&entries, &ts, 0, &cfg), 87);
    }

    #[test]
    fn test_both_count_and_size_takes_larger_drop() {
        // count policy keeps 40 (drops 60), size policy keeps 13
        // (drops 87) with the same 480-byte budget as
        // test_size_retention. The AND-together rule takes the
        // larger drop count.
        let entries = heap_entries(100, 16);
        let ts = dummy_timestamps(100);
        let cfg = RedexFileConfig::default()
            .with_retention_max_events(40)
            .with_retention_max_bytes(480);
        assert_eq!(compute_eviction_count(&entries, &ts, 0, &cfg), 87);
    }

    #[test]
    fn test_regression_size_retention_counts_index_overhead() {
        // Regression: size retention used to count only the payload
        // bytes, so a workload with tiny payloads and a tight budget
        // could retain far more entries than the configured cap in
        // actual memory/disk usage. Inline entries (payload inside
        // the 20-byte index record) in particular were charged 8
        // bytes each when they actually cost 20.
        //
        // Fix: charge 20 bytes of index record for every entry
        // (inline or heap), plus the heap payload. Inline-only
        // workloads now see the cap respected byte-for-byte.
        use super::super::entry::{payload_checksum, RedexEntry};

        // 10 inline entries. Pre-fix: accounting was 80 bytes; a
        // 100-byte cap retained all 10. Post-fix: accounting is
        // 200 bytes; a 100-byte cap retains at most 5 (100 / 20).
        let entries: Vec<RedexEntry> = (0..10u64)
            .map(|i| {
                let payload = i.to_le_bytes();
                RedexEntry::new_inline(i, &payload, payload_checksum(&payload))
            })
            .collect();
        let ts = dummy_timestamps(10);
        let cfg = RedexFileConfig::default().with_retention_max_bytes(100);
        assert_eq!(
            compute_eviction_count(&entries, &ts, 0, &cfg),
            5,
            "size retention must charge the 20-byte index record per inline entry"
        );
    }

    #[test]
    fn test_empty_index() {
        let cfg = RedexFileConfig::default().with_retention_max_events(10);
        assert_eq!(compute_eviction_count(&[], &[], 0, &cfg), 0);
    }

    // ---- Age policy ----

    fn ts_seq(count: usize, step_ns: u64) -> Vec<u64> {
        (0..count as u64).map(|i| i * step_ns + 1_000).collect()
    }

    #[test]
    fn test_age_retention_drops_older_than_cutoff() {
        // 10 entries, 1 ns apart at t=1000..1009.
        // max_age = 5 ns, now = 1009 → cutoff = 1004. Drop entries
        // with ts < 1004: ts 1000..=1003 = 4 entries.
        //
        // Breaking change: pre-fix, the predicate was
        // `ts > cutoff` (drop on `ts <= cutoff`), so an entry
        // exactly `max_age_ns` old (ts == cutoff) was dropped —
        // 5 entries dropped here. Post-fix uses `ts >= cutoff`
        // (drop on `ts < cutoff`), retaining the boundary entry
        // for intuitive "max age" semantics; 4 entries dropped.
        let entries = heap_entries(10, 16);
        let ts = ts_seq(10, 1);
        let cfg = RedexFileConfig::default().with_retention_max_age(Duration::from_nanos(5));
        assert_eq!(compute_eviction_count(&entries, &ts, 1009, &cfg), 4);
    }

    #[test]
    fn test_age_retention_no_drops_when_all_young() {
        let entries = heap_entries(5, 16);
        // timestamps just below now; max_age large → nothing expires.
        let ts = vec![100, 101, 102, 103, 104];
        let cfg = RedexFileConfig::default().with_retention_max_age(Duration::from_nanos(1000));
        assert_eq!(compute_eviction_count(&entries, &ts, 105, &cfg), 0);
    }

    #[test]
    fn test_age_retention_drops_all_when_all_old() {
        let entries = heap_entries(5, 16);
        let ts = vec![1, 2, 3, 4, 5];
        let cfg = RedexFileConfig::default().with_retention_max_age(Duration::from_nanos(1));
        // cutoff = now - 1 = 99; all ts <= 99 → drop all 5.
        assert_eq!(compute_eviction_count(&entries, &ts, 100, &cfg), 5);
    }

    #[test]
    fn test_age_retention_now_before_any_timestamp_drops_nothing() {
        // Weird edge case: clock skew or tests with small now.
        // All timestamps > now → cutoff = 0 (saturating_sub) → nothing
        // drops.
        let entries = heap_entries(3, 16);
        let ts = vec![1000, 2000, 3000];
        let cfg = RedexFileConfig::default().with_retention_max_age(Duration::from_nanos(100));
        assert_eq!(compute_eviction_count(&entries, &ts, 50, &cfg), 0);
    }

    #[test]
    fn test_combined_count_and_age_takes_larger_drop() {
        // Count says drop 3 (keep newest 2 of 5). Age says drop 4
        // (only newest 1 fits under 2 ns cutoff). Final drop = 4.
        let entries = heap_entries(5, 16);
        let ts = vec![100, 101, 102, 103, 104];
        let cfg = RedexFileConfig::default()
            .with_retention_max_events(2)
            .with_retention_max_age(Duration::from_nanos(2));
        // now=104, cutoff=102 → ts 100,101,102 → 3 old; ts 103 stops.
        // Wait — ts > cutoff is the condition to STOP. So 100,101,102 all <= 102 → 3 drops.
        // Count says drop 3 too. Max is 3.
        assert_eq!(compute_eviction_count(&entries, &ts, 104, &cfg), 3);
    }

    #[test]
    fn test_combined_age_larger_than_count() {
        // 10 entries with small old timestamps. Count says drop 5,
        // age says drop 10. Max = 10 wins.
        let entries = heap_entries(10, 16);
        let ts = vec![1u64; 10];
        let cfg = RedexFileConfig::default()
            .with_retention_max_events(5)
            .with_retention_max_age(Duration::from_nanos(1));
        // now=1000, cutoff=999. All ts = 1 < 999 → drop 10.
        assert_eq!(compute_eviction_count(&entries, &ts, 1000, &cfg), 10);
    }

    /// Regression: an entry with timestamp exactly equal
    /// to the cutoff (`ts == now - max_age_ns`) — i.e. an entry
    /// that is exactly `max_age_ns` old — must be RETAINED, not
    /// dropped. Pre-fix the predicate was `ts > cutoff` (drop on
    /// `ts <= cutoff`), so the boundary entry was dropped. Post-
    /// fix uses `ts >= cutoff` (drop on `ts < cutoff`), aligning
    /// with intuitive "max age N retains entries up to N old"
    /// semantics.
    #[test]
    fn bug23_entry_at_exact_cutoff_is_retained() {
        let entries = heap_entries(3, 16);
        // ts: [10, 15, 20]. now=20, max_age=5 → cutoff=15.
        let ts = vec![10u64, 15, 20];
        let cfg = RedexFileConfig::default().with_retention_max_age(Duration::from_nanos(5));
        // Only ts=10 (< 15) should be dropped; ts=15 (== cutoff)
        // is retained (NEW behavior); ts=20 is fresh.
        assert_eq!(
            compute_eviction_count(&entries, &ts, 20, &cfg),
            1,
            "entry at exactly cutoff (ts=15, max_age=5, now=20) \
             must be retained — pre-fix this dropped 2 entries"
        );
    }
}