laminar_core/streaming/
config.rs1pub const DEFAULT_BUFFER_SIZE: usize = 2048;
5
6pub const MIN_BUFFER_SIZE: usize = 4;
8
9pub const MAX_BUFFER_SIZE: usize = 1 << 20; #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
18pub enum BackpressureStrategy {
19 #[default]
21 Block,
22 DropOldest,
24 Reject,
26}
27
28impl std::str::FromStr for BackpressureStrategy {
29 type Err = String;
30
31 fn from_str(s: &str) -> Result<Self, Self::Err> {
32 match s.to_lowercase().as_str() {
33 "block" | "blocking" => Ok(Self::Block),
34 "drop" | "drop_oldest" | "dropoldest" => Ok(Self::DropOldest),
35 "reject" | "error" => Ok(Self::Reject),
36 _ => Err(format!(
37 "invalid backpressure strategy: '{s}'. Valid values: block, drop_oldest, reject"
38 )),
39 }
40 }
41}
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
46pub enum WaitStrategy {
47 Spin,
49 #[default]
51 SpinYield,
52 Park,
54}
55
56impl std::str::FromStr for WaitStrategy {
57 type Err = String;
58
59 fn from_str(s: &str) -> Result<Self, Self::Err> {
60 match s.to_lowercase().as_str() {
61 "spin" => Ok(Self::Spin),
62 "spin_yield" | "spinyield" | "yield" => Ok(Self::SpinYield),
63 "park" | "parking" => Ok(Self::Park),
64 _ => Err(format!(
65 "invalid wait strategy: '{s}'. Valid values: spin, spin_yield, park"
66 )),
67 }
68 }
69}
70
71#[derive(Debug, Clone)]
74pub struct ChannelConfig {
75 pub buffer_size: usize,
77 pub backpressure: BackpressureStrategy,
79 pub wait_strategy: WaitStrategy,
81 pub track_stats: bool,
83}
84
85impl Default for ChannelConfig {
86 fn default() -> Self {
87 Self {
88 buffer_size: DEFAULT_BUFFER_SIZE,
89 backpressure: BackpressureStrategy::Block,
90 wait_strategy: WaitStrategy::SpinYield,
91 track_stats: false,
92 }
93 }
94}
95
96impl ChannelConfig {
97 #[must_use]
99 pub fn with_buffer_size(buffer_size: usize) -> Self {
100 Self {
101 buffer_size: buffer_size.clamp(MIN_BUFFER_SIZE, MAX_BUFFER_SIZE),
102 ..Default::default()
103 }
104 }
105}
106
107#[derive(Debug, Clone, Default)]
109pub struct SourceConfig {
110 pub channel: ChannelConfig,
112 pub name: Option<String>,
114}
115
116impl SourceConfig {
117 #[must_use]
119 pub fn with_buffer_size(buffer_size: usize) -> Self {
120 Self {
121 channel: ChannelConfig::with_buffer_size(buffer_size),
122 name: None,
123 }
124 }
125
126 #[must_use]
128 pub fn named(name: impl Into<String>) -> Self {
129 Self {
130 channel: ChannelConfig::default(),
131 name: Some(name.into()),
132 }
133 }
134}
135
136#[cfg(test)]
137mod tests {
138 use super::*;
139
140 #[test]
141 fn test_default_config() {
142 let config = ChannelConfig::default();
143 assert_eq!(config.buffer_size, DEFAULT_BUFFER_SIZE);
144 }
145
146 #[test]
147 fn test_buffer_size_clamping() {
148 let config = ChannelConfig::with_buffer_size(0);
149 assert_eq!(config.buffer_size, MIN_BUFFER_SIZE);
150
151 let config = ChannelConfig::with_buffer_size(usize::MAX);
152 assert_eq!(config.buffer_size, MAX_BUFFER_SIZE);
153 }
154
155 #[test]
156 fn test_source_config() {
157 let config = SourceConfig::with_buffer_size(512);
158 assert_eq!(config.channel.buffer_size, 512);
159 assert!(config.name.is_none());
160
161 let config = SourceConfig::named("my_source");
162 assert_eq!(config.name.as_deref(), Some("my_source"));
163 }
164
165 #[test]
166 fn test_backpressure_parse() {
167 assert_eq!(
168 "block".parse::<BackpressureStrategy>().unwrap(),
169 BackpressureStrategy::Block
170 );
171 assert_eq!(
172 "reject".parse::<BackpressureStrategy>().unwrap(),
173 BackpressureStrategy::Reject
174 );
175 assert!("invalid".parse::<BackpressureStrategy>().is_err());
176 }
177}