use crate::env::{read, runtime as runtime_env};
use once_cell::sync::Lazy;
use std::time::Duration;
#[derive(Debug, Clone)]
pub struct SleepConfig {
pub shutdown_grace_ms: u64,
pub block_strategy_ms: f64,
pub discovery_poll_ms: u64,
pub consume_sleep_us: u64,
pub sleep_yield_threshold_us: u64,
pub consumer_busy_wait_us: u64,
}
impl Default for SleepConfig {
fn default() -> Self {
Self {
shutdown_grace_ms: 10,
block_strategy_ms: 0.01,
discovery_poll_ms: 10,
consume_sleep_us: 1,
sleep_yield_threshold_us: 50,
consumer_busy_wait_us: 10,
}
}
}
impl SleepConfig {
pub fn from_env() -> Self {
let mut config = Self::default();
if let Some(ms) = read::parse::<u64>(runtime_env::SHUTDOWN_GRACE_MS) {
config.shutdown_grace_ms = ms;
}
if let Some(us) = read::parse::<u64>(runtime_env::BLOCK_STRATEGY_US) {
config.block_strategy_ms = us as f64 / 1000.0;
} else if let Some(ms) = read::parse::<f64>(runtime_env::BLOCK_STRATEGY_MS) {
if ms >= 0.0 {
config.block_strategy_ms = ms;
}
}
if let Some(ms) = read::parse::<u64>(runtime_env::DISCOVERY_POLL_MS) {
config.discovery_poll_ms = ms;
}
if let Some(us) = read::parse::<u64>(runtime_env::CONSUME_SLEEP_US) {
config.consume_sleep_us = us;
}
if let Some(us) = read::parse::<u64>(runtime_env::SLEEP_YIELD_THRESHOLD_US) {
config.sleep_yield_threshold_us = us;
}
if let Some(us) = read::parse::<u64>(runtime_env::CONSUMER_BUSY_WAIT_US) {
config.consumer_busy_wait_us = us;
}
config
}
pub fn shutdown_grace_duration(&self) -> Duration {
Duration::from_millis(self.shutdown_grace_ms)
}
pub fn block_strategy_duration(&self) -> Duration {
let millis = (self.block_strategy_ms * 1000.0) as u64;
let nanos = ((self.block_strategy_ms * 1_000_000.0) as u64) % 1_000_000;
Duration::from_millis(millis / 1000) + Duration::from_nanos(nanos)
}
pub fn discovery_poll_duration(&self) -> Duration {
Duration::from_millis(self.discovery_poll_ms)
}
pub fn consume_sleep_duration(&self) -> Duration {
Duration::from_micros(self.consume_sleep_us)
}
#[cfg(test)]
pub fn consumer_busy_wait_duration(&self) -> Duration {
Duration::from_micros(self.consumer_busy_wait_us)
}
}
pub static SLEEP_CONFIG: Lazy<SleepConfig> = Lazy::new(SleepConfig::from_env);
#[inline]
pub fn default_block_strategy_duration() -> Duration {
SLEEP_CONFIG.block_strategy_duration()
}
#[inline]
pub fn default_consume_sleep_duration() -> Duration {
SLEEP_CONFIG.consume_sleep_duration()
}
#[inline]
pub fn default_discovery_poll_duration() -> Duration {
SLEEP_CONFIG.discovery_poll_duration()
}
#[inline]
pub fn sleep_or_yield(duration: Duration) {
if duration <= Duration::from_micros(SLEEP_CONFIG.sleep_yield_threshold_us) {
std::thread::yield_now();
} else {
std::thread::sleep(duration);
}
}
#[inline]
pub fn perform_default_block_wait() {
sleep_or_yield(default_block_strategy_duration());
}
#[inline]
pub fn perform_default_consume_sleep_wait() {
sleep_or_yield(default_consume_sleep_duration());
}
#[inline]
pub fn perform_default_discovery_poll_wait() {
sleep_or_yield(default_discovery_poll_duration());
}
#[inline]
pub fn perform_sleep_wait(duration: Duration) {
sleep_or_yield(duration);
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_default_config() {
let config = SleepConfig::default();
assert_eq!(config.shutdown_grace_ms, 10);
assert_eq!(config.block_strategy_ms, 0.01);
assert_eq!(config.discovery_poll_ms, 10);
assert_eq!(config.consume_sleep_us, 1);
assert_eq!(config.sleep_yield_threshold_us, 50);
assert_eq!(config.consumer_busy_wait_us, 10);
}
#[test]
fn test_duration_conversions() {
let config = SleepConfig::default();
assert_eq!(config.shutdown_grace_duration(), Duration::from_millis(10));
assert_eq!(config.discovery_poll_duration(), Duration::from_millis(10));
assert_eq!(config.consume_sleep_duration(), Duration::from_micros(1));
assert_eq!(config.block_strategy_duration(), Duration::from_micros(10));
assert_eq!(
config.consumer_busy_wait_duration(),
Duration::from_micros(10)
);
}
#[test]
fn test_block_strategy_fractional() {
let config = SleepConfig {
block_strategy_ms: 0.5,
..SleepConfig::default()
};
assert_eq!(config.block_strategy_duration(), Duration::from_micros(500));
}
}