use std::{ops::RangeInclusive, num::NonZeroU8};
use std::time::Duration;
use reactive_mutiny::prelude::Instruments;
#[derive(Debug,PartialEq)]
pub enum MessageForms {
Textual { max_size: u32 },
VariableBinary { max_size: u32 },
MmapBinary { size: u32 },
}
impl MessageForms {
const fn as_repr(&self) -> u8 {
(match self {
Self::Textual { max_size } => set_bits_from_power_of_2_u32(0, 2..=6, *max_size),
Self::VariableBinary { max_size } => set_bits_from_power_of_2_u32(1, 2..=6, *max_size),
Self::MmapBinary { size } => set_bits_from_power_of_2_u32(2, 2..=6, *size),
}) as u8
}
const fn from_repr(repr: u8) -> Self {
let (variant, n) = (repr & 0x03, get_power_of_2_u32_bits(repr as u64, 2..=6));
match variant {
0 => Self::Textual { max_size: n },
1 => Self::VariableBinary { max_size: n },
2 => Self::MmapBinary { size: n },
_ => unreachable!(), }
}
}
#[derive(Debug,PartialEq)]
pub enum RetryingStrategies {
DoNotRetry,
EndCommunications,
RetryWithBackoffUpTo(u8),
RetryYieldingForUpToMillis(u8),
RetrySpinningForUpToMillis(u8),
}
impl RetryingStrategies {
const fn as_repr(&self) -> u16 {
match self {
Self::DoNotRetry => 0,
Self::EndCommunications => 1,
Self::RetryWithBackoffUpTo(n) => 2 | (*n as u16) << 3,
Self::RetryYieldingForUpToMillis(n) => 3 | (*n as u16) << 3,
Self::RetrySpinningForUpToMillis(n) => 4 | (*n as u16) << 3,
}
}
const fn from_repr(repr: u16) -> Self {
let (variant, n) = (repr & 0x07, repr >> 3);
match variant {
0 => Self::DoNotRetry,
1 => Self::EndCommunications,
2 => Self::RetryWithBackoffUpTo(n as u8),
3 => Self::RetryYieldingForUpToMillis(n as u8),
4 => Self::RetrySpinningForUpToMillis(n as u8),
_ => unreachable!(), }
}
}
#[derive(Debug,PartialEq)]
pub struct SocketOptions {
pub hops_to_live: Option<NonZeroU8>,
pub linger_millis: Option<u32>,
pub no_delay: Option<bool>,
}
impl SocketOptions {
const fn as_repr(&self) -> u32 {
(self.no_delay.is_some() as u32) | (unwrap_bool_or_default(self.no_delay) as u32) << 1 | (self.linger_millis.is_some() as u32) << 2 | if let Some(linger_millis) = self.linger_millis {
if linger_millis > 0 {
set_bits_from_power_of_2_u32(0, 3..=7, linger_millis) as u32 } else {
0
}
} else {
0
}
| (unwrap_non_zero_u8_or_default(self.hops_to_live) as u32) << 8 }
const fn from_repr(repr: u32) -> Self {
let (no_delay_flag, no_delay_data, linger_flag, linger_data, hops) = (
(repr & (1 << 0) ) > 0, (repr & (1 << 1) ) > 0, (repr & (1 << 2) ) > 0, get_power_of_2_u32_bits(repr as u64, 3..=7), (repr & (( (!0u8) as u32) << 8) ) >> 8, );
Self {
hops_to_live: if hops > 0 { NonZeroU8::new(hops as u8) } else { None },
linger_millis: if linger_flag { Some(linger_data) } else { None },
no_delay: if no_delay_flag { Some(no_delay_data) } else { None },
}
}
}
#[derive(Debug,PartialEq)]
pub struct ConstConfig {
pub sender_channel_size: u32,
pub receiver_channel_size: u32,
pub sender_max_msg_size: u32,
pub receiver_max_msg_size: u32,
pub flush_timeout_millis: u16,
pub retrying_strategy: RetryingStrategies,
pub socket_options: SocketOptions,
pub executor_instruments: Instruments,
}
#[warn(non_snake_case)]
impl ConstConfig {
#![allow(non_snake_case)]
const SENDER_CHANNEL_SIZE: RangeInclusive<usize> = 0..=4;
const RECEIVER_CHANNEL_SIZE: RangeInclusive<usize> = 5..=9;
const SENDER_MAX_MSG_SIZE: RangeInclusive<usize> = 10..=14;
const RECEIVER_MAX_MSG_SIZE: RangeInclusive<usize> = 15..=19;
const FLUSH_TIMEOUT_MILLIS: RangeInclusive<usize> = 20..=23;
const RETRYING_STRATEGY: RangeInclusive<usize> = 24..=34;
const SOCKET_OPTIONS: RangeInclusive<usize> = 35..=50;
const EXECUTOR_INSTRUMENTS: RangeInclusive<usize> = 51..=59;
pub const fn default() -> ConstConfig {
ConstConfig {
sender_channel_size: 1024,
receiver_channel_size: 1024,
sender_max_msg_size: 1024,
receiver_max_msg_size: 1024,
flush_timeout_millis: 256,
retrying_strategy: RetryingStrategies::RetryWithBackoffUpTo(20),
socket_options: SocketOptions { hops_to_live: NonZeroU8::new(255), linger_millis: Some(128), no_delay: Some(true) },
executor_instruments: Instruments::from(Instruments::NoInstruments.into()),
}
}
pub const fn into(self) -> u64 {
let mut config = 0u64;
config = set_bits_from_power_of_2_u32(config, Self::SENDER_CHANNEL_SIZE, self.sender_channel_size);
config = set_bits_from_power_of_2_u32(config, Self::RECEIVER_CHANNEL_SIZE, self.receiver_channel_size);
config = set_bits_from_power_of_2_u32(config, Self::SENDER_MAX_MSG_SIZE, self.sender_max_msg_size);
config = set_bits_from_power_of_2_u32(config, Self::RECEIVER_MAX_MSG_SIZE, self.receiver_max_msg_size);
config = set_bits_from_power_of_2_u16(config, Self::FLUSH_TIMEOUT_MILLIS, self.flush_timeout_millis);
let retrying_strategy_repr = self.retrying_strategy.as_repr();
config = set_bits(config, Self::RETRYING_STRATEGY, retrying_strategy_repr as u64);
let socket_options_repr = self.socket_options.as_repr();
config = set_bits(config, Self::SOCKET_OPTIONS, socket_options_repr as u64);
let executor_instruments_repr = self.executor_instruments.into();
config = set_bits(config, Self::EXECUTOR_INSTRUMENTS, executor_instruments_repr as u64);
config
}
pub const fn from(config: u64) -> Self {
let sender_channel_size = get_power_of_2_u32_bits(config, Self::SENDER_CHANNEL_SIZE);
let receiver_channel_size = get_power_of_2_u32_bits(config, Self::RECEIVER_CHANNEL_SIZE);
let sender_max_msg_size = get_power_of_2_u32_bits(config, Self::SENDER_MAX_MSG_SIZE);
let receiver_max_msg_size = get_power_of_2_u32_bits(config, Self::RECEIVER_MAX_MSG_SIZE);
let flush_timeout_millis = get_power_of_2_u16_bits(config, Self::FLUSH_TIMEOUT_MILLIS);
let retrying_strategy_repr = get_bits(config, Self::RETRYING_STRATEGY);
let socket_options_repr = get_bits(config, Self::SOCKET_OPTIONS);
let executor_instruments_repr = get_bits(config, Self::EXECUTOR_INSTRUMENTS);
Self {
flush_timeout_millis,
sender_channel_size,
receiver_channel_size,
sender_max_msg_size,
receiver_max_msg_size,
retrying_strategy: RetryingStrategies::from_repr(retrying_strategy_repr as u16),
socket_options: SocketOptions::from_repr(socket_options_repr as u32),
executor_instruments: Instruments::from(executor_instruments_repr as usize),
}
}
pub const fn extract_sender_channel_size(config: u64) -> u32 {
let config = Self::from(config);
config.sender_channel_size
}
pub const fn extract_receiver_channel_size(config: u64) -> u32 {
let config = Self::from(config);
config.receiver_channel_size
}
pub const fn extract_sender_max_msg_size(config: u64) -> u32 {
let config = Self::from(config);
config.sender_max_msg_size
}
pub const fn extract_receiver_max_msg_size(config: u64) -> u32 {
let config = Self::from(config);
config.receiver_max_msg_size
}
pub const fn extract_flush_timeout(config: u64) -> Duration {
let config = Self::from(config);
Duration::from_millis(config.flush_timeout_millis as u64)
}
pub const fn extract_retrying_strategy(config: u64) -> RetryingStrategies {
let config = Self::from(config);
config.retrying_strategy
}
pub const fn extract_socket_options(config: u64) -> SocketOptions {
let config = Self::from(config);
config.socket_options
}
pub const fn extract_executor_instruments(config: u64) -> usize {
let config = Self::from(config);
config.executor_instruments.into()
}
}
const fn get_bits(config: u64, bits: RangeInclusive<usize>) -> u64 {
let bits_len = *bits.end()-*bits.start()+1;
(config>>*bits.start()) & ((1<<bits_len)-1)
}
const fn set_bits(mut config: u64, bits: RangeInclusive<usize>, value: u64) -> u64 {
let bits_len = *bits.end()-*bits.start()+1;
if value > (1<<bits_len)-1 {
unreachable!();
} else {
config &= !( ((1<<bits_len)-1) << *bits.start() ); config |= value << *bits.start(); config
}
}
const fn get_power_of_2_u32_bits(config: u64, bits: RangeInclusive<usize>) -> u32 {
let value = get_bits(config, bits);
1 << value
}
const fn set_bits_from_power_of_2_u32(config: u64, bits: RangeInclusive<usize>, power_of_2_u32_value: u32) -> u64 {
if power_of_2_u32_value.is_power_of_two() {
set_bits(config, bits, power_of_2_u32_value.ilog2() as u64)
} else {
unreachable!();
}
}
const fn get_power_of_2_u16_bits(config: u64, bits: RangeInclusive<usize>) -> u16 {
let value = get_bits(config, bits);
1 << value
}
const fn set_bits_from_power_of_2_u16(config: u64, bits: RangeInclusive<usize>, power_of_2_u16_value: u16) -> u64 {
if power_of_2_u16_value.is_power_of_two() {
set_bits(config, bits, power_of_2_u16_value.ilog2() as u64)
} else {
unreachable!();
}
}
const fn _get_power_of_3_u8_bits(config: u64, bits: RangeInclusive<usize>) -> u8 {
let value = get_bits(config, bits);
1 << value
}
const fn _set_bits_from_power_of_2_u8(config: u64, bits: RangeInclusive<usize>, power_of_2_u8_value: u8) -> u64 {
if power_of_2_u8_value.is_power_of_two() {
set_bits(config, bits, power_of_2_u8_value.ilog2() as u64)
} else {
unreachable!();
}
}
const fn unwrap_bool_or_default(option: Option<bool>) -> bool {
match option {
Some(v) => v,
None => false,
}
}
const fn _unwrap_u8_or_default(option: Option<u8>) -> u8 {
match option {
Some(v) => v,
None => 0,
}
}
const fn _unwrap_u16_or_default(option: Option<u16>) -> u16 {
match option {
Some(v) => v,
None => 0,
}
}
const fn _unwrap_u32_or_default(option: Option<u32>) -> u32 {
match option {
Some(v) => v,
None => 0,
}
}
const fn unwrap_non_zero_u8_or_default(option: Option<NonZeroU8>) -> u8 {
match option {
Some(v) => v.get(),
None => 0,
}
}
#[cfg(any(test,doc))]
mod tests {
use super::*;
#[cfg_attr(not(doc),test)]
fn retrying_strategies_repr() {
let subjects = vec![
vec![
RetryingStrategies::DoNotRetry,
RetryingStrategies::EndCommunications,
].into_iter(),
(0..8).map(|n| RetryingStrategies::RetryWithBackoffUpTo(1<<n)).collect::<Vec<_>>().into_iter(),
(0..8).map(|n| RetryingStrategies::RetryYieldingForUpToMillis(1<<n)).collect::<Vec<_>>().into_iter(),
(0..8).map(|n| RetryingStrategies::RetrySpinningForUpToMillis(1<<n)).collect::<Vec<_>>().into_iter(),
].into_iter().flatten();
for expected in subjects {
let converted = RetryingStrategies::as_repr(&expected);
let reconverted = RetryingStrategies::from_repr(converted);
assert_eq!(reconverted, expected, "FAILED: {:?} (repr: 0x{:x}); reconverted: {:?}", expected, converted, reconverted);
}
}
#[cfg_attr(not(doc),test)]
fn socket_options_repr() {
let subjects = vec![
vec![
SocketOptions { hops_to_live: None, linger_millis: None, no_delay: None},
SocketOptions { hops_to_live: None, linger_millis: None, no_delay: Some(false)},
SocketOptions { hops_to_live: None, linger_millis: None, no_delay: Some(true)},
SocketOptions { hops_to_live: None, linger_millis: Some(1), no_delay: None},
SocketOptions { hops_to_live: NonZeroU8::new(1), linger_millis: None, no_delay: None},
].into_iter(),
(0..31).map(|n| SocketOptions { hops_to_live: None, linger_millis: Some(1<<n), no_delay: None }).collect::<Vec<_>>().into_iter(),
(0..8) .map(|n| SocketOptions { hops_to_live: NonZeroU8::new(1<<n), linger_millis: None, no_delay: None }).collect::<Vec<_>>().into_iter(),
].into_iter().flatten();
for expected in subjects {
let converted = SocketOptions::as_repr(&expected);
let reconverted = SocketOptions::from_repr(converted);
assert_eq!(reconverted, expected, "FAILED: {:?} (repr: 0x{:x}); reconverted: {:?}", expected, converted, reconverted);
}
}
#[cfg_attr(not(doc),test)]
fn const_config() {
let expected = || ConstConfig {
sender_channel_size: 4096,
receiver_channel_size: 2048,
sender_max_msg_size: 256,
receiver_max_msg_size: 512,
flush_timeout_millis: 256,
retrying_strategy: RetryingStrategies::RetryWithBackoffUpTo(14),
socket_options: SocketOptions { hops_to_live: NonZeroU8::new(255), linger_millis: Some(128), no_delay: Some(true) },
executor_instruments: Instruments::from(Instruments::LogsWithExpensiveMetrics.into()),
};
let converted = ConstConfig::into(expected());
let reconverted = ConstConfig::from(converted);
assert_eq!(reconverted, expected(), "FAILED: {:?} (repr: 0x{:x}); reconverted: {:?}", expected(), converted, reconverted);
}
}