volli-manager 0.1.12

Manager for volli
Documentation
use fastrand::Rng;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use volli_core::{ManagerPeerEntry, env_config};

const SHORT_ID_LEN: usize = 8;

pub(crate) fn now_millis() -> u64 {
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap_or_default()
        .as_millis() as u64
}

pub(crate) fn now_secs() -> u64 {
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap_or_default()
        .as_secs()
}

pub(crate) fn test_dur(secs: u64) -> Duration {
    let ecfg = env_config();
    if let Some(ms) = ecfg.interval_ms {
        Duration::from_millis(ms)
    } else {
        Duration::from_secs(secs)
    }
}

pub async fn sleep_backoff(backoff: u64) {
    let ecfg = env_config();
    let base_ms = if let Some(ms) = ecfg.backoff_ms {
        ms
    } else {
        backoff * 1_000
    };
    // In tests, notify and return immediately to avoid timing-sensitive sleeps.
    #[cfg(test)]
    {
        if !IGNORE_TEST_SLEEP_NOTIFY.load(std::sync::atomic::Ordering::SeqCst)
            && let Some(tx) = TEST_SLEEP_NOTIFY.get()
        {
            let _ = tx.send(());
            return;
        }
    }
    // Jitter uniformly in [0.5x, 1.5x) of base to avoid thundering herds.
    // Use integer arithmetic to avoid boundary rounding producing 1.5x + 1ms.
    let mut rng = Rng::with_seed(now_millis());
    // Guard: if base_ms is 0 (explicit fast mode), return immediately.
    if base_ms == 0 {
        return;
    }
    let low = base_ms / 2; // 0.5x
    let wait_ms = low + rng.u64(0..base_ms); // in [0.5x, 1.5x)
    tokio::time::sleep(Duration::from_millis(wait_ms)).await;
}

pub(crate) fn short_id(id: &str) -> &str {
    id.get(..SHORT_ID_LEN).unwrap_or(id)
}

pub(crate) struct PeerLog<'a>(pub &'a ManagerPeerEntry);

impl<'a> std::fmt::Display for PeerLog<'a> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        let p = self.0;
        write!(
            f,
            "{}@{}({})",
            p.manager_name,
            p.host,
            short_id(&p.manager_id)
        )
    }
}

// Test-only hooks to observe or bypass backoff sleeps deterministically.
#[cfg(test)]
use std::sync::{
    OnceLock,
    atomic::{AtomicBool, Ordering},
};
#[cfg(test)]
static TEST_SLEEP_NOTIFY: OnceLock<tokio::sync::mpsc::UnboundedSender<()>> = OnceLock::new();
#[cfg(test)]
static IGNORE_TEST_SLEEP_NOTIFY: AtomicBool = AtomicBool::new(false);
#[cfg(test)]
pub fn set_test_sleep_notifier(tx: tokio::sync::mpsc::UnboundedSender<()>) {
    let _ = TEST_SLEEP_NOTIFY.set(tx);
}
#[cfg(test)]
pub fn set_ignore_test_sleep_notify(ignore: bool) {
    IGNORE_TEST_SLEEP_NOTIFY.store(ignore, Ordering::SeqCst);
}