1pub const DEFAULT_BUFFER_SIZE: usize = 2048;
7
8pub const MIN_BUFFER_SIZE: usize = 4;
10
11pub const MAX_BUFFER_SIZE: usize = 1 << 20; #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
19pub enum BackpressureStrategy {
20 #[default]
24 Block,
25
26 DropOldest,
30
31 Reject,
35}
36
37impl std::str::FromStr for BackpressureStrategy {
38 type Err = String;
39
40 fn from_str(s: &str) -> Result<Self, Self::Err> {
41 match s.to_lowercase().as_str() {
42 "block" | "blocking" => Ok(Self::Block),
43 "drop" | "drop_oldest" | "dropoldest" => Ok(Self::DropOldest),
44 "reject" | "error" => Ok(Self::Reject),
45 _ => Err(format!(
46 "invalid backpressure strategy: '{s}'. Valid values: block, drop_oldest, reject"
47 )),
48 }
49 }
50}
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
56pub enum WaitStrategy {
57 Spin,
61
62 #[default]
66 SpinYield,
67
68 Park,
72}
73
74impl std::str::FromStr for WaitStrategy {
75 type Err = String;
76
77 fn from_str(s: &str) -> Result<Self, Self::Err> {
78 match s.to_lowercase().as_str() {
79 "spin" => Ok(Self::Spin),
80 "spin_yield" | "spinyield" | "yield" => Ok(Self::SpinYield),
81 "park" | "parking" => Ok(Self::Park),
82 _ => Err(format!(
83 "invalid wait strategy: '{s}'. Valid values: spin, spin_yield, park"
84 )),
85 }
86 }
87}
88
89#[derive(Debug, Clone)]
91pub struct ChannelConfig {
92 pub buffer_size: usize,
94
95 pub backpressure: BackpressureStrategy,
97
98 pub wait_strategy: WaitStrategy,
100
101 pub track_stats: bool,
103}
104
105impl Default for ChannelConfig {
106 fn default() -> Self {
107 Self {
108 buffer_size: DEFAULT_BUFFER_SIZE,
109 backpressure: BackpressureStrategy::Block,
110 wait_strategy: WaitStrategy::SpinYield,
111 track_stats: false,
112 }
113 }
114}
115
116impl ChannelConfig {
117 #[must_use]
119 pub fn with_buffer_size(buffer_size: usize) -> Self {
120 Self {
121 buffer_size: buffer_size.clamp(MIN_BUFFER_SIZE, MAX_BUFFER_SIZE),
122 ..Default::default()
123 }
124 }
125
126 #[must_use]
128 pub fn builder() -> ChannelConfigBuilder {
129 ChannelConfigBuilder::default()
130 }
131
132 #[must_use]
134 pub fn effective_buffer_size(&self) -> usize {
135 self.buffer_size
136 .clamp(MIN_BUFFER_SIZE, MAX_BUFFER_SIZE)
137 .next_power_of_two()
138 }
139}
140
141#[derive(Debug, Default)]
143pub struct ChannelConfigBuilder {
144 buffer_size: Option<usize>,
145 backpressure: Option<BackpressureStrategy>,
146 wait_strategy: Option<WaitStrategy>,
147 track_stats: Option<bool>,
148}
149
150impl ChannelConfigBuilder {
151 #[must_use]
153 pub fn buffer_size(mut self, size: usize) -> Self {
154 self.buffer_size = Some(size);
155 self
156 }
157
158 #[must_use]
160 pub fn backpressure(mut self, strategy: BackpressureStrategy) -> Self {
161 self.backpressure = Some(strategy);
162 self
163 }
164
165 #[must_use]
167 pub fn wait_strategy(mut self, strategy: WaitStrategy) -> Self {
168 self.wait_strategy = Some(strategy);
169 self
170 }
171
172 #[must_use]
174 pub fn track_stats(mut self, enabled: bool) -> Self {
175 self.track_stats = Some(enabled);
176 self
177 }
178
179 #[must_use]
181 pub fn build(self) -> ChannelConfig {
182 ChannelConfig {
183 buffer_size: self
184 .buffer_size
185 .unwrap_or(DEFAULT_BUFFER_SIZE)
186 .clamp(MIN_BUFFER_SIZE, MAX_BUFFER_SIZE),
187 backpressure: self.backpressure.unwrap_or_default(),
188 wait_strategy: self.wait_strategy.unwrap_or_default(),
189 track_stats: self.track_stats.unwrap_or(false),
190 }
191 }
192}
193
194#[derive(Debug, Clone, Default)]
196pub struct SourceConfig {
197 pub channel: ChannelConfig,
199
200 pub name: Option<String>,
202}
203
204impl SourceConfig {
205 #[must_use]
207 pub fn with_buffer_size(buffer_size: usize) -> Self {
208 Self {
209 channel: ChannelConfig::with_buffer_size(buffer_size),
210 name: None,
211 }
212 }
213
214 #[must_use]
216 pub fn named(name: impl Into<String>) -> Self {
217 Self {
218 channel: ChannelConfig::default(),
219 name: Some(name.into()),
220 }
221 }
222}
223
224#[derive(Debug, Clone, Default)]
226pub struct SinkConfig {
227 pub channel: ChannelConfig,
229
230 pub name: Option<String>,
232
233 pub max_subscribers: usize,
235}
236
237impl SinkConfig {
238 #[must_use]
240 pub fn with_buffer_size(buffer_size: usize) -> Self {
241 Self {
242 channel: ChannelConfig::with_buffer_size(buffer_size),
243 name: None,
244 max_subscribers: 0,
245 }
246 }
247
248 #[must_use]
250 pub fn named(name: impl Into<String>) -> Self {
251 Self {
252 channel: ChannelConfig::default(),
253 name: Some(name.into()),
254 max_subscribers: 0,
255 }
256 }
257}
258
259#[derive(Debug, Clone, Default)]
261pub struct ChannelStats {
262 pub items_pushed: u64,
264
265 pub items_popped: u64,
267
268 pub push_blocked: u64,
270
271 pub items_dropped: u64,
273
274 pub pop_empty: u64,
276}
277
278impl ChannelStats {
279 #[must_use]
281 pub fn in_flight(&self) -> u64 {
282 self.items_pushed.saturating_sub(self.items_popped)
283 }
284
285 #[must_use]
287 #[allow(clippy::cast_precision_loss)] pub fn drop_rate(&self) -> f64 {
289 if self.items_pushed == 0 {
290 0.0
291 } else {
292 self.items_dropped as f64 / self.items_pushed as f64
293 }
294 }
295}
296
297#[cfg(test)]
298mod tests {
299 use super::*;
300
301 #[test]
302 fn test_default_config() {
303 let config = ChannelConfig::default();
304 assert_eq!(config.buffer_size, DEFAULT_BUFFER_SIZE);
305 assert_eq!(config.backpressure, BackpressureStrategy::Block);
306 assert_eq!(config.wait_strategy, WaitStrategy::SpinYield);
307 assert!(!config.track_stats);
308 }
309
310 #[test]
311 fn test_config_builder() {
312 let config = ChannelConfig::builder()
313 .buffer_size(1024)
314 .backpressure(BackpressureStrategy::DropOldest)
315 .wait_strategy(WaitStrategy::Park)
316 .track_stats(true)
317 .build();
318
319 assert_eq!(config.buffer_size, 1024);
320 assert_eq!(config.backpressure, BackpressureStrategy::DropOldest);
321 assert_eq!(config.wait_strategy, WaitStrategy::Park);
322 assert!(config.track_stats);
323 }
324
325 #[test]
326 fn test_effective_buffer_size() {
327 let config = ChannelConfig::with_buffer_size(100);
328 assert_eq!(config.effective_buffer_size(), 128); let config = ChannelConfig::with_buffer_size(1);
331 assert_eq!(
332 config.effective_buffer_size(),
333 MIN_BUFFER_SIZE.next_power_of_two()
334 );
335 }
336
337 #[test]
338 fn test_buffer_size_clamping() {
339 let config = ChannelConfig::with_buffer_size(0);
340 assert_eq!(config.buffer_size, MIN_BUFFER_SIZE);
341
342 let config = ChannelConfig::with_buffer_size(usize::MAX);
343 assert_eq!(config.buffer_size, MAX_BUFFER_SIZE);
344 }
345
346 #[test]
347 fn test_source_config() {
348 let config = SourceConfig::with_buffer_size(512);
349 assert_eq!(config.channel.buffer_size, 512);
350 assert!(config.name.is_none());
351
352 let config = SourceConfig::named("my_source");
353 assert_eq!(config.name.as_deref(), Some("my_source"));
354 }
355
356 #[test]
357 fn test_sink_config() {
358 let config = SinkConfig::with_buffer_size(256);
359 assert_eq!(config.channel.buffer_size, 256);
360 assert!(config.name.is_none());
361 assert_eq!(config.max_subscribers, 0);
362
363 let config = SinkConfig::named("my_sink");
364 assert_eq!(config.name.as_deref(), Some("my_sink"));
365 }
366
367 #[test]
368 fn test_channel_stats() {
369 let mut stats = ChannelStats {
370 items_pushed: 100,
371 ..ChannelStats::default()
372 };
373 stats.items_popped = 80;
374 stats.items_dropped = 5;
375
376 assert_eq!(stats.in_flight(), 20);
377 assert!((stats.drop_rate() - 0.05).abs() < f64::EPSILON);
378 }
379
380 #[test]
381 fn test_channel_stats_empty() {
382 let stats = ChannelStats::default();
383 assert_eq!(stats.in_flight(), 0);
384 assert!((stats.drop_rate() - 0.0).abs() < f64::EPSILON);
385 }
386}