Skip to main content

laminar_core/streaming/
config.rs

1//! Streaming API configuration types.
2//!
3//! This module defines configuration types for channels, sources, and sinks.
4
5/// Default buffer size for channels (128KB worth of 64-byte cache lines).
6pub const DEFAULT_BUFFER_SIZE: usize = 2048;
7
8/// Minimum buffer size (must hold at least a few items).
9pub const MIN_BUFFER_SIZE: usize = 4;
10
11/// Maximum buffer size (prevent excessive memory usage).
12pub const MAX_BUFFER_SIZE: usize = 1 << 20; // 1M entries
13
14/// Backpressure strategy when buffer is full.
15///
16/// This determines what happens when a producer tries to push
17/// to a full channel.
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
19pub enum BackpressureStrategy {
20    /// Block until space is available.
21    ///
22    /// Best for exactly-once semantics where no data loss is acceptable.
23    #[default]
24    Block,
25
26    /// Drop the oldest item to make room for the new one.
27    ///
28    /// Best for real-time systems where freshness matters more than completeness.
29    DropOldest,
30
31    /// Reject the push and return an error immediately.
32    ///
33    /// Lets the caller decide what to do with the rejected item.
34    Reject,
35}
36
37/// Wait strategy for consumers when channel is empty.
38///
39/// This determines how a consumer waits for data when the channel is empty.
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
41pub enum WaitStrategy {
42    /// Spin-loop without yielding (lowest latency, highest CPU).
43    ///
44    /// Best for ultra-low-latency scenarios with dedicated cores.
45    Spin,
46
47    /// Spin with occasional thread yields (balanced).
48    ///
49    /// Good balance between latency and CPU usage.
50    #[default]
51    SpinYield,
52
53    /// Park the thread and wait for notification (lowest CPU).
54    ///
55    /// Best for batch processing where latency is less critical.
56    Park,
57}
58
59/// Configuration for a streaming channel.
60#[derive(Debug, Clone)]
61pub struct ChannelConfig {
62    /// Buffer size (will be rounded up to power of 2).
63    pub buffer_size: usize,
64
65    /// Backpressure strategy when buffer is full.
66    pub backpressure: BackpressureStrategy,
67
68    /// Wait strategy for consumers.
69    pub wait_strategy: WaitStrategy,
70
71    /// Whether to track statistics (small overhead).
72    pub track_stats: bool,
73}
74
75impl Default for ChannelConfig {
76    fn default() -> Self {
77        Self {
78            buffer_size: DEFAULT_BUFFER_SIZE,
79            backpressure: BackpressureStrategy::Block,
80            wait_strategy: WaitStrategy::SpinYield,
81            track_stats: false,
82        }
83    }
84}
85
86impl ChannelConfig {
87    /// Creates a new channel configuration with the specified buffer size.
88    #[must_use]
89    pub fn with_buffer_size(buffer_size: usize) -> Self {
90        Self {
91            buffer_size: buffer_size.clamp(MIN_BUFFER_SIZE, MAX_BUFFER_SIZE),
92            ..Default::default()
93        }
94    }
95
96    /// Creates a builder for custom configuration.
97    #[must_use]
98    pub fn builder() -> ChannelConfigBuilder {
99        ChannelConfigBuilder::default()
100    }
101
102    /// Returns the effective buffer size (rounded to power of 2).
103    #[must_use]
104    pub fn effective_buffer_size(&self) -> usize {
105        self.buffer_size
106            .clamp(MIN_BUFFER_SIZE, MAX_BUFFER_SIZE)
107            .next_power_of_two()
108    }
109}
110
111/// Builder for `ChannelConfig`.
112#[derive(Debug, Default)]
113pub struct ChannelConfigBuilder {
114    buffer_size: Option<usize>,
115    backpressure: Option<BackpressureStrategy>,
116    wait_strategy: Option<WaitStrategy>,
117    track_stats: Option<bool>,
118}
119
120impl ChannelConfigBuilder {
121    /// Sets the buffer size.
122    #[must_use]
123    pub fn buffer_size(mut self, size: usize) -> Self {
124        self.buffer_size = Some(size);
125        self
126    }
127
128    /// Sets the backpressure strategy.
129    #[must_use]
130    pub fn backpressure(mut self, strategy: BackpressureStrategy) -> Self {
131        self.backpressure = Some(strategy);
132        self
133    }
134
135    /// Sets the wait strategy.
136    #[must_use]
137    pub fn wait_strategy(mut self, strategy: WaitStrategy) -> Self {
138        self.wait_strategy = Some(strategy);
139        self
140    }
141
142    /// Enables statistics tracking.
143    #[must_use]
144    pub fn track_stats(mut self, enabled: bool) -> Self {
145        self.track_stats = Some(enabled);
146        self
147    }
148
149    /// Builds the configuration.
150    #[must_use]
151    pub fn build(self) -> ChannelConfig {
152        ChannelConfig {
153            buffer_size: self
154                .buffer_size
155                .unwrap_or(DEFAULT_BUFFER_SIZE)
156                .clamp(MIN_BUFFER_SIZE, MAX_BUFFER_SIZE),
157            backpressure: self.backpressure.unwrap_or_default(),
158            wait_strategy: self.wait_strategy.unwrap_or_default(),
159            track_stats: self.track_stats.unwrap_or(false),
160        }
161    }
162}
163
164/// Configuration for a Source.
165#[derive(Debug, Clone, Default)]
166pub struct SourceConfig {
167    /// Channel configuration.
168    pub channel: ChannelConfig,
169
170    /// Name of the source (for debugging/metrics).
171    pub name: Option<String>,
172}
173
174impl SourceConfig {
175    /// Creates a new source configuration with the specified buffer size.
176    #[must_use]
177    pub fn with_buffer_size(buffer_size: usize) -> Self {
178        Self {
179            channel: ChannelConfig::with_buffer_size(buffer_size),
180            name: None,
181        }
182    }
183
184    /// Creates a named source configuration.
185    #[must_use]
186    pub fn named(name: impl Into<String>) -> Self {
187        Self {
188            channel: ChannelConfig::default(),
189            name: Some(name.into()),
190        }
191    }
192}
193
194/// Configuration for a Sink.
195#[derive(Debug, Clone, Default)]
196pub struct SinkConfig {
197    /// Channel configuration for each subscriber.
198    pub channel: ChannelConfig,
199
200    /// Name of the sink (for debugging/metrics).
201    pub name: Option<String>,
202
203    /// Maximum number of subscribers (0 = unlimited).
204    pub max_subscribers: usize,
205}
206
207impl SinkConfig {
208    /// Creates a new sink configuration with the specified buffer size.
209    #[must_use]
210    pub fn with_buffer_size(buffer_size: usize) -> Self {
211        Self {
212            channel: ChannelConfig::with_buffer_size(buffer_size),
213            name: None,
214            max_subscribers: 0,
215        }
216    }
217
218    /// Creates a named sink configuration.
219    #[must_use]
220    pub fn named(name: impl Into<String>) -> Self {
221        Self {
222            channel: ChannelConfig::default(),
223            name: Some(name.into()),
224            max_subscribers: 0,
225        }
226    }
227}
228
229/// Statistics for a channel.
230#[derive(Debug, Clone, Default)]
231pub struct ChannelStats {
232    /// Total items pushed.
233    pub items_pushed: u64,
234
235    /// Total items popped.
236    pub items_popped: u64,
237
238    /// Times push was blocked due to full buffer.
239    pub push_blocked: u64,
240
241    /// Items dropped due to backpressure.
242    pub items_dropped: u64,
243
244    /// Times pop found empty buffer.
245    pub pop_empty: u64,
246}
247
248impl ChannelStats {
249    /// Returns the number of items currently in flight.
250    #[must_use]
251    pub fn in_flight(&self) -> u64 {
252        self.items_pushed.saturating_sub(self.items_popped)
253    }
254
255    /// Returns the drop rate (0.0 to 1.0).
256    #[must_use]
257    #[allow(clippy::cast_precision_loss)] // Stats are approximate, precision loss is acceptable
258    pub fn drop_rate(&self) -> f64 {
259        if self.items_pushed == 0 {
260            0.0
261        } else {
262            self.items_dropped as f64 / self.items_pushed as f64
263        }
264    }
265}
266
267#[cfg(test)]
268mod tests {
269    use super::*;
270
271    #[test]
272    fn test_default_config() {
273        let config = ChannelConfig::default();
274        assert_eq!(config.buffer_size, DEFAULT_BUFFER_SIZE);
275        assert_eq!(config.backpressure, BackpressureStrategy::Block);
276        assert_eq!(config.wait_strategy, WaitStrategy::SpinYield);
277        assert!(!config.track_stats);
278    }
279
280    #[test]
281    fn test_config_builder() {
282        let config = ChannelConfig::builder()
283            .buffer_size(1024)
284            .backpressure(BackpressureStrategy::DropOldest)
285            .wait_strategy(WaitStrategy::Park)
286            .track_stats(true)
287            .build();
288
289        assert_eq!(config.buffer_size, 1024);
290        assert_eq!(config.backpressure, BackpressureStrategy::DropOldest);
291        assert_eq!(config.wait_strategy, WaitStrategy::Park);
292        assert!(config.track_stats);
293    }
294
295    #[test]
296    fn test_effective_buffer_size() {
297        let config = ChannelConfig::with_buffer_size(100);
298        assert_eq!(config.effective_buffer_size(), 128); // Next power of 2
299
300        let config = ChannelConfig::with_buffer_size(1);
301        assert_eq!(
302            config.effective_buffer_size(),
303            MIN_BUFFER_SIZE.next_power_of_two()
304        );
305    }
306
307    #[test]
308    fn test_buffer_size_clamping() {
309        let config = ChannelConfig::with_buffer_size(0);
310        assert_eq!(config.buffer_size, MIN_BUFFER_SIZE);
311
312        let config = ChannelConfig::with_buffer_size(usize::MAX);
313        assert_eq!(config.buffer_size, MAX_BUFFER_SIZE);
314    }
315
316    #[test]
317    fn test_source_config() {
318        let config = SourceConfig::with_buffer_size(512);
319        assert_eq!(config.channel.buffer_size, 512);
320        assert!(config.name.is_none());
321
322        let config = SourceConfig::named("my_source");
323        assert_eq!(config.name.as_deref(), Some("my_source"));
324    }
325
326    #[test]
327    fn test_sink_config() {
328        let config = SinkConfig::with_buffer_size(256);
329        assert_eq!(config.channel.buffer_size, 256);
330        assert!(config.name.is_none());
331        assert_eq!(config.max_subscribers, 0);
332
333        let config = SinkConfig::named("my_sink");
334        assert_eq!(config.name.as_deref(), Some("my_sink"));
335    }
336
337    #[test]
338    fn test_channel_stats() {
339        let mut stats = ChannelStats {
340            items_pushed: 100,
341            ..ChannelStats::default()
342        };
343        stats.items_popped = 80;
344        stats.items_dropped = 5;
345
346        assert_eq!(stats.in_flight(), 20);
347        assert!((stats.drop_rate() - 0.05).abs() < f64::EPSILON);
348    }
349
350    #[test]
351    fn test_channel_stats_empty() {
352        let stats = ChannelStats::default();
353        assert_eq!(stats.in_flight(), 0);
354        assert!((stats.drop_rate() - 0.0).abs() < f64::EPSILON);
355    }
356}