kevy-rt 1.6.1

kevy thread-per-core shared-nothing runtime — pure Rust, zero deps.
Documentation
//! Per-tick housekeeping for the [`Shard`] reactor — pulled out of
//! [`crate::shard`] to keep that file under the 500-LOC house rule.
//!
//! Called from the reactor's tick branch (once per `tick_interval_ms`,
//! 100 ms by default). Each `Some` value from the embedder's
//! [`crate::Commands::live_runtime_config`] tick is applied to the
//! shard's live state, and the auto-AOF-rewrite check fires if the
//! live AOF has grown past its threshold.

use crate::Commands;
use crate::shard::Shard;
use std::time::Duration;

impl<C: Commands> Shard<C> {
    /// Pull the live runtime knobs from the [`crate::Commands`] impl
    /// and apply each `Some` to the shard's state. Called from the
    /// tick branch (once per `tick_interval_ms`) so the cost is
    /// amortised across thousands of commands; embedders that never
    /// hot-swap inherit the trait default (all-None → zero work
    /// beyond one struct build).
    pub(crate) fn apply_live_runtime_config(&mut self, tick_interval: &mut Option<Duration>) {
        let live = self.commands.live_runtime_config();
        if let Some(f) = live.appendfsync
            && let Some(aof) = &mut self.aof
        {
            // A failure to flush on policy tighten is logged but doesn't
            // bring the shard down — the policy itself still takes effect
            // and subsequent appends will retry the sync.
            if let Err(e) = aof.set_fsync(f) {
                eprintln!("kevy: shard {} set_fsync failed: {e}", self.id);
            }
        }
        if let Some(p) = live.auto_aof_rewrite_pct {
            self.auto_aof_rewrite_pct = p;
        }
        if let Some(m) = live.auto_aof_rewrite_min_size {
            self.auto_aof_rewrite_min_size = m;
        }
        if let Some(ms) = live.tick_interval_ms {
            *tick_interval = if ms == 0 {
                None
            } else {
                Some(Duration::from_millis(ms))
            };
        }
        if let Some(flags) = live.notify_flags {
            self.notify_flags = flags;
        }
        if let Some(t) = live.slowlog_slower_than_micros {
            self.slowlog.slower_than_micros = t;
        }
        if let Some(n) = live.slowlog_max_len {
            self.slowlog.max_len = n;
            let cap = n as usize;
            while self.slowlog.buf.len() > cap {
                self.slowlog.buf.pop_front();
            }
        }
    }

    /// Check whether the live AOF has grown enough to warrant an automatic
    /// `BGREWRITEAOF`, and run it inline if so. Called from the tick path
    /// — at most every `tick_interval_ms`, so the cost is amortised across
    /// thousands of writes per check. No-op when AOF is disabled, when the
    /// `auto_aof_rewrite_pct` knob is `0`, or when the current AOF is
    /// smaller than `auto_aof_rewrite_min_size`.
    pub(crate) fn maybe_auto_rewrite_aof(&mut self) {
        if self.auto_aof_rewrite_pct == 0 {
            return;
        }
        let Some(aof) = &self.aof else { return };
        let cur = aof.size_bytes();
        if cur < self.auto_aof_rewrite_min_size {
            return;
        }
        let baseline = aof.size_at_last_rewrite().max(1);
        // (cur - baseline) * 100 / baseline ≥ pct  ⇔  cur * 100 ≥ baseline * (100 + pct)
        let lhs = cur.saturating_mul(100);
        let rhs = baseline.saturating_mul(100u64.saturating_add(self.auto_aof_rewrite_pct as u64));
        if lhs < rhs {
            return;
        }
        let aof = self.aof.as_mut().expect("just checked");
        if let Err(e) = aof.rewrite_from(&self.store) {
            eprintln!("kevy: shard {} auto AOF rewrite failed: {e}", self.id);
        }
    }
}