1pub const DEFAULT_BUFFER_SIZE: usize = 2048;
7
8pub const MIN_BUFFER_SIZE: usize = 4;
10
11pub const MAX_BUFFER_SIZE: usize = 1 << 20; #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
19pub enum BackpressureStrategy {
20 #[default]
24 Block,
25
26 DropOldest,
30
31 Reject,
35}
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
41pub enum WaitStrategy {
42 Spin,
46
47 #[default]
51 SpinYield,
52
53 Park,
57}
58
59#[derive(Debug, Clone)]
61pub struct ChannelConfig {
62 pub buffer_size: usize,
64
65 pub backpressure: BackpressureStrategy,
67
68 pub wait_strategy: WaitStrategy,
70
71 pub track_stats: bool,
73}
74
75impl Default for ChannelConfig {
76 fn default() -> Self {
77 Self {
78 buffer_size: DEFAULT_BUFFER_SIZE,
79 backpressure: BackpressureStrategy::Block,
80 wait_strategy: WaitStrategy::SpinYield,
81 track_stats: false,
82 }
83 }
84}
85
86impl ChannelConfig {
87 #[must_use]
89 pub fn with_buffer_size(buffer_size: usize) -> Self {
90 Self {
91 buffer_size: buffer_size.clamp(MIN_BUFFER_SIZE, MAX_BUFFER_SIZE),
92 ..Default::default()
93 }
94 }
95
96 #[must_use]
98 pub fn builder() -> ChannelConfigBuilder {
99 ChannelConfigBuilder::default()
100 }
101
102 #[must_use]
104 pub fn effective_buffer_size(&self) -> usize {
105 self.buffer_size
106 .clamp(MIN_BUFFER_SIZE, MAX_BUFFER_SIZE)
107 .next_power_of_two()
108 }
109}
110
111#[derive(Debug, Default)]
113pub struct ChannelConfigBuilder {
114 buffer_size: Option<usize>,
115 backpressure: Option<BackpressureStrategy>,
116 wait_strategy: Option<WaitStrategy>,
117 track_stats: Option<bool>,
118}
119
120impl ChannelConfigBuilder {
121 #[must_use]
123 pub fn buffer_size(mut self, size: usize) -> Self {
124 self.buffer_size = Some(size);
125 self
126 }
127
128 #[must_use]
130 pub fn backpressure(mut self, strategy: BackpressureStrategy) -> Self {
131 self.backpressure = Some(strategy);
132 self
133 }
134
135 #[must_use]
137 pub fn wait_strategy(mut self, strategy: WaitStrategy) -> Self {
138 self.wait_strategy = Some(strategy);
139 self
140 }
141
142 #[must_use]
144 pub fn track_stats(mut self, enabled: bool) -> Self {
145 self.track_stats = Some(enabled);
146 self
147 }
148
149 #[must_use]
151 pub fn build(self) -> ChannelConfig {
152 ChannelConfig {
153 buffer_size: self
154 .buffer_size
155 .unwrap_or(DEFAULT_BUFFER_SIZE)
156 .clamp(MIN_BUFFER_SIZE, MAX_BUFFER_SIZE),
157 backpressure: self.backpressure.unwrap_or_default(),
158 wait_strategy: self.wait_strategy.unwrap_or_default(),
159 track_stats: self.track_stats.unwrap_or(false),
160 }
161 }
162}
163
164#[derive(Debug, Clone, Default)]
166pub struct SourceConfig {
167 pub channel: ChannelConfig,
169
170 pub name: Option<String>,
172}
173
174impl SourceConfig {
175 #[must_use]
177 pub fn with_buffer_size(buffer_size: usize) -> Self {
178 Self {
179 channel: ChannelConfig::with_buffer_size(buffer_size),
180 name: None,
181 }
182 }
183
184 #[must_use]
186 pub fn named(name: impl Into<String>) -> Self {
187 Self {
188 channel: ChannelConfig::default(),
189 name: Some(name.into()),
190 }
191 }
192}
193
194#[derive(Debug, Clone, Default)]
196pub struct SinkConfig {
197 pub channel: ChannelConfig,
199
200 pub name: Option<String>,
202
203 pub max_subscribers: usize,
205}
206
207impl SinkConfig {
208 #[must_use]
210 pub fn with_buffer_size(buffer_size: usize) -> Self {
211 Self {
212 channel: ChannelConfig::with_buffer_size(buffer_size),
213 name: None,
214 max_subscribers: 0,
215 }
216 }
217
218 #[must_use]
220 pub fn named(name: impl Into<String>) -> Self {
221 Self {
222 channel: ChannelConfig::default(),
223 name: Some(name.into()),
224 max_subscribers: 0,
225 }
226 }
227}
228
229#[derive(Debug, Clone, Default)]
231pub struct ChannelStats {
232 pub items_pushed: u64,
234
235 pub items_popped: u64,
237
238 pub push_blocked: u64,
240
241 pub items_dropped: u64,
243
244 pub pop_empty: u64,
246}
247
248impl ChannelStats {
249 #[must_use]
251 pub fn in_flight(&self) -> u64 {
252 self.items_pushed.saturating_sub(self.items_popped)
253 }
254
255 #[must_use]
257 #[allow(clippy::cast_precision_loss)] pub fn drop_rate(&self) -> f64 {
259 if self.items_pushed == 0 {
260 0.0
261 } else {
262 self.items_dropped as f64 / self.items_pushed as f64
263 }
264 }
265}
266
267#[cfg(test)]
268mod tests {
269 use super::*;
270
271 #[test]
272 fn test_default_config() {
273 let config = ChannelConfig::default();
274 assert_eq!(config.buffer_size, DEFAULT_BUFFER_SIZE);
275 assert_eq!(config.backpressure, BackpressureStrategy::Block);
276 assert_eq!(config.wait_strategy, WaitStrategy::SpinYield);
277 assert!(!config.track_stats);
278 }
279
280 #[test]
281 fn test_config_builder() {
282 let config = ChannelConfig::builder()
283 .buffer_size(1024)
284 .backpressure(BackpressureStrategy::DropOldest)
285 .wait_strategy(WaitStrategy::Park)
286 .track_stats(true)
287 .build();
288
289 assert_eq!(config.buffer_size, 1024);
290 assert_eq!(config.backpressure, BackpressureStrategy::DropOldest);
291 assert_eq!(config.wait_strategy, WaitStrategy::Park);
292 assert!(config.track_stats);
293 }
294
295 #[test]
296 fn test_effective_buffer_size() {
297 let config = ChannelConfig::with_buffer_size(100);
298 assert_eq!(config.effective_buffer_size(), 128); let config = ChannelConfig::with_buffer_size(1);
301 assert_eq!(
302 config.effective_buffer_size(),
303 MIN_BUFFER_SIZE.next_power_of_two()
304 );
305 }
306
307 #[test]
308 fn test_buffer_size_clamping() {
309 let config = ChannelConfig::with_buffer_size(0);
310 assert_eq!(config.buffer_size, MIN_BUFFER_SIZE);
311
312 let config = ChannelConfig::with_buffer_size(usize::MAX);
313 assert_eq!(config.buffer_size, MAX_BUFFER_SIZE);
314 }
315
316 #[test]
317 fn test_source_config() {
318 let config = SourceConfig::with_buffer_size(512);
319 assert_eq!(config.channel.buffer_size, 512);
320 assert!(config.name.is_none());
321
322 let config = SourceConfig::named("my_source");
323 assert_eq!(config.name.as_deref(), Some("my_source"));
324 }
325
326 #[test]
327 fn test_sink_config() {
328 let config = SinkConfig::with_buffer_size(256);
329 assert_eq!(config.channel.buffer_size, 256);
330 assert!(config.name.is_none());
331 assert_eq!(config.max_subscribers, 0);
332
333 let config = SinkConfig::named("my_sink");
334 assert_eq!(config.name.as_deref(), Some("my_sink"));
335 }
336
337 #[test]
338 fn test_channel_stats() {
339 let mut stats = ChannelStats {
340 items_pushed: 100,
341 ..ChannelStats::default()
342 };
343 stats.items_popped = 80;
344 stats.items_dropped = 5;
345
346 assert_eq!(stats.in_flight(), 20);
347 assert!((stats.drop_rate() - 0.05).abs() < f64::EPSILON);
348 }
349
350 #[test]
351 fn test_channel_stats_empty() {
352 let stats = ChannelStats::default();
353 assert_eq!(stats.in_flight(), 0);
354 assert!((stats.drop_rate() - 0.0).abs() < f64::EPSILON);
355 }
356}