pub const DEFAULT_BUFFER_SIZE: usize = 2048;
pub const MIN_BUFFER_SIZE: usize = 4;
pub const MAX_BUFFER_SIZE: usize = 1 << 20;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum BackpressureStrategy {
#[default]
Block,
DropOldest,
Reject,
}
impl std::str::FromStr for BackpressureStrategy {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"block" | "blocking" => Ok(Self::Block),
"drop" | "drop_oldest" | "dropoldest" => Ok(Self::DropOldest),
"reject" | "error" => Ok(Self::Reject),
_ => Err(format!(
"invalid backpressure strategy: '{s}'. Valid values: block, drop_oldest, reject"
)),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum WaitStrategy {
Spin,
#[default]
SpinYield,
Park,
}
impl std::str::FromStr for WaitStrategy {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"spin" => Ok(Self::Spin),
"spin_yield" | "spinyield" | "yield" => Ok(Self::SpinYield),
"park" | "parking" => Ok(Self::Park),
_ => Err(format!(
"invalid wait strategy: '{s}'. Valid values: spin, spin_yield, park"
)),
}
}
}
#[derive(Debug, Clone)]
pub struct ChannelConfig {
pub buffer_size: usize,
pub backpressure: BackpressureStrategy,
pub wait_strategy: WaitStrategy,
pub track_stats: bool,
}
impl Default for ChannelConfig {
fn default() -> Self {
Self {
buffer_size: DEFAULT_BUFFER_SIZE,
backpressure: BackpressureStrategy::Block,
wait_strategy: WaitStrategy::SpinYield,
track_stats: false,
}
}
}
impl ChannelConfig {
#[must_use]
pub fn with_buffer_size(buffer_size: usize) -> Self {
Self {
buffer_size: buffer_size.clamp(MIN_BUFFER_SIZE, MAX_BUFFER_SIZE),
..Default::default()
}
}
}
#[derive(Debug, Clone, Default)]
pub struct SourceConfig {
pub channel: ChannelConfig,
pub name: Option<String>,
}
impl SourceConfig {
#[must_use]
pub fn with_buffer_size(buffer_size: usize) -> Self {
Self {
channel: ChannelConfig::with_buffer_size(buffer_size),
name: None,
}
}
#[must_use]
pub fn named(name: impl Into<String>) -> Self {
Self {
channel: ChannelConfig::default(),
name: Some(name.into()),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_default_config() {
let config = ChannelConfig::default();
assert_eq!(config.buffer_size, DEFAULT_BUFFER_SIZE);
}
#[test]
fn test_buffer_size_clamping() {
let config = ChannelConfig::with_buffer_size(0);
assert_eq!(config.buffer_size, MIN_BUFFER_SIZE);
let config = ChannelConfig::with_buffer_size(usize::MAX);
assert_eq!(config.buffer_size, MAX_BUFFER_SIZE);
}
#[test]
fn test_source_config() {
let config = SourceConfig::with_buffer_size(512);
assert_eq!(config.channel.buffer_size, 512);
assert!(config.name.is_none());
let config = SourceConfig::named("my_source");
assert_eq!(config.name.as_deref(), Some("my_source"));
}
#[test]
fn test_backpressure_parse() {
assert_eq!(
"block".parse::<BackpressureStrategy>().unwrap(),
BackpressureStrategy::Block
);
assert_eq!(
"reject".parse::<BackpressureStrategy>().unwrap(),
BackpressureStrategy::Reject
);
assert!("invalid".parse::<BackpressureStrategy>().is_err());
}
}