Skip to main content

laminar_core/streaming/
config.rs

1//! Streaming API configuration types.
2//!
3//! This module defines configuration types for channels, sources, and sinks.
4
5/// Default buffer size for channels (128KB worth of 64-byte cache lines).
6pub const DEFAULT_BUFFER_SIZE: usize = 2048;
7
8/// Minimum buffer size (must hold at least a few items).
9pub const MIN_BUFFER_SIZE: usize = 4;
10
11/// Maximum buffer size (prevent excessive memory usage).
12pub const MAX_BUFFER_SIZE: usize = 1 << 20; // 1M entries
13
14/// Backpressure strategy when buffer is full.
15///
16/// This determines what happens when a producer tries to push
17/// to a full channel.
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
19pub enum BackpressureStrategy {
20    /// Block until space is available.
21    ///
22    /// Best for exactly-once semantics where no data loss is acceptable.
23    #[default]
24    Block,
25
26    /// Drop the oldest item to make room for the new one.
27    ///
28    /// Best for real-time systems where freshness matters more than completeness.
29    DropOldest,
30
31    /// Reject the push and return an error immediately.
32    ///
33    /// Lets the caller decide what to do with the rejected item.
34    Reject,
35}
36
37impl std::str::FromStr for BackpressureStrategy {
38    type Err = String;
39
40    fn from_str(s: &str) -> Result<Self, Self::Err> {
41        match s.to_lowercase().as_str() {
42            "block" | "blocking" => Ok(Self::Block),
43            "drop" | "drop_oldest" | "dropoldest" => Ok(Self::DropOldest),
44            "reject" | "error" => Ok(Self::Reject),
45            _ => Err(format!(
46                "invalid backpressure strategy: '{s}'. Valid values: block, drop_oldest, reject"
47            )),
48        }
49    }
50}
51
52/// Wait strategy for consumers when channel is empty.
53///
54/// This determines how a consumer waits for data when the channel is empty.
55#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
56pub enum WaitStrategy {
57    /// Spin-loop without yielding (lowest latency, highest CPU).
58    ///
59    /// Best for ultra-low-latency scenarios with dedicated cores.
60    Spin,
61
62    /// Spin with occasional thread yields (balanced).
63    ///
64    /// Good balance between latency and CPU usage.
65    #[default]
66    SpinYield,
67
68    /// Park the thread and wait for notification (lowest CPU).
69    ///
70    /// Best for batch processing where latency is less critical.
71    Park,
72}
73
74impl std::str::FromStr for WaitStrategy {
75    type Err = String;
76
77    fn from_str(s: &str) -> Result<Self, Self::Err> {
78        match s.to_lowercase().as_str() {
79            "spin" => Ok(Self::Spin),
80            "spin_yield" | "spinyield" | "yield" => Ok(Self::SpinYield),
81            "park" | "parking" => Ok(Self::Park),
82            _ => Err(format!(
83                "invalid wait strategy: '{s}'. Valid values: spin, spin_yield, park"
84            )),
85        }
86    }
87}
88
89/// Configuration for a streaming channel.
90#[derive(Debug, Clone)]
91pub struct ChannelConfig {
92    /// Buffer size (will be rounded up to power of 2).
93    pub buffer_size: usize,
94
95    /// Backpressure strategy when buffer is full.
96    pub backpressure: BackpressureStrategy,
97
98    /// Wait strategy for consumers.
99    pub wait_strategy: WaitStrategy,
100
101    /// Whether to track statistics (small overhead).
102    pub track_stats: bool,
103}
104
105impl Default for ChannelConfig {
106    fn default() -> Self {
107        Self {
108            buffer_size: DEFAULT_BUFFER_SIZE,
109            backpressure: BackpressureStrategy::Block,
110            wait_strategy: WaitStrategy::SpinYield,
111            track_stats: false,
112        }
113    }
114}
115
116impl ChannelConfig {
117    /// Creates a new channel configuration with the specified buffer size.
118    #[must_use]
119    pub fn with_buffer_size(buffer_size: usize) -> Self {
120        Self {
121            buffer_size: buffer_size.clamp(MIN_BUFFER_SIZE, MAX_BUFFER_SIZE),
122            ..Default::default()
123        }
124    }
125
126    /// Creates a builder for custom configuration.
127    #[must_use]
128    pub fn builder() -> ChannelConfigBuilder {
129        ChannelConfigBuilder::default()
130    }
131
132    /// Returns the effective buffer size (rounded to power of 2).
133    #[must_use]
134    pub fn effective_buffer_size(&self) -> usize {
135        self.buffer_size
136            .clamp(MIN_BUFFER_SIZE, MAX_BUFFER_SIZE)
137            .next_power_of_two()
138    }
139}
140
141/// Builder for `ChannelConfig`.
142#[derive(Debug, Default)]
143pub struct ChannelConfigBuilder {
144    buffer_size: Option<usize>,
145    backpressure: Option<BackpressureStrategy>,
146    wait_strategy: Option<WaitStrategy>,
147    track_stats: Option<bool>,
148}
149
150impl ChannelConfigBuilder {
151    /// Sets the buffer size.
152    #[must_use]
153    pub fn buffer_size(mut self, size: usize) -> Self {
154        self.buffer_size = Some(size);
155        self
156    }
157
158    /// Sets the backpressure strategy.
159    #[must_use]
160    pub fn backpressure(mut self, strategy: BackpressureStrategy) -> Self {
161        self.backpressure = Some(strategy);
162        self
163    }
164
165    /// Sets the wait strategy.
166    #[must_use]
167    pub fn wait_strategy(mut self, strategy: WaitStrategy) -> Self {
168        self.wait_strategy = Some(strategy);
169        self
170    }
171
172    /// Enables statistics tracking.
173    #[must_use]
174    pub fn track_stats(mut self, enabled: bool) -> Self {
175        self.track_stats = Some(enabled);
176        self
177    }
178
179    /// Builds the configuration.
180    #[must_use]
181    pub fn build(self) -> ChannelConfig {
182        ChannelConfig {
183            buffer_size: self
184                .buffer_size
185                .unwrap_or(DEFAULT_BUFFER_SIZE)
186                .clamp(MIN_BUFFER_SIZE, MAX_BUFFER_SIZE),
187            backpressure: self.backpressure.unwrap_or_default(),
188            wait_strategy: self.wait_strategy.unwrap_or_default(),
189            track_stats: self.track_stats.unwrap_or(false),
190        }
191    }
192}
193
194/// Configuration for a Source.
195#[derive(Debug, Clone, Default)]
196pub struct SourceConfig {
197    /// Channel configuration.
198    pub channel: ChannelConfig,
199
200    /// Name of the source (for debugging/metrics).
201    pub name: Option<String>,
202}
203
204impl SourceConfig {
205    /// Creates a new source configuration with the specified buffer size.
206    #[must_use]
207    pub fn with_buffer_size(buffer_size: usize) -> Self {
208        Self {
209            channel: ChannelConfig::with_buffer_size(buffer_size),
210            name: None,
211        }
212    }
213
214    /// Creates a named source configuration.
215    #[must_use]
216    pub fn named(name: impl Into<String>) -> Self {
217        Self {
218            channel: ChannelConfig::default(),
219            name: Some(name.into()),
220        }
221    }
222}
223
224/// Configuration for a Sink.
225#[derive(Debug, Clone, Default)]
226pub struct SinkConfig {
227    /// Channel configuration for each subscriber.
228    pub channel: ChannelConfig,
229
230    /// Name of the sink (for debugging/metrics).
231    pub name: Option<String>,
232
233    /// Maximum number of subscribers (0 = unlimited).
234    pub max_subscribers: usize,
235}
236
237impl SinkConfig {
238    /// Creates a new sink configuration with the specified buffer size.
239    #[must_use]
240    pub fn with_buffer_size(buffer_size: usize) -> Self {
241        Self {
242            channel: ChannelConfig::with_buffer_size(buffer_size),
243            name: None,
244            max_subscribers: 0,
245        }
246    }
247
248    /// Creates a named sink configuration.
249    #[must_use]
250    pub fn named(name: impl Into<String>) -> Self {
251        Self {
252            channel: ChannelConfig::default(),
253            name: Some(name.into()),
254            max_subscribers: 0,
255        }
256    }
257}
258
259/// Statistics for a channel.
260#[derive(Debug, Clone, Default)]
261pub struct ChannelStats {
262    /// Total items pushed.
263    pub items_pushed: u64,
264
265    /// Total items popped.
266    pub items_popped: u64,
267
268    /// Times push was blocked due to full buffer.
269    pub push_blocked: u64,
270
271    /// Items dropped due to backpressure.
272    pub items_dropped: u64,
273
274    /// Times pop found empty buffer.
275    pub pop_empty: u64,
276}
277
278impl ChannelStats {
279    /// Returns the number of items currently in flight.
280    #[must_use]
281    pub fn in_flight(&self) -> u64 {
282        self.items_pushed.saturating_sub(self.items_popped)
283    }
284
285    /// Returns the drop rate (0.0 to 1.0).
286    #[must_use]
287    #[allow(clippy::cast_precision_loss)] // Stats are approximate, precision loss is acceptable
288    pub fn drop_rate(&self) -> f64 {
289        if self.items_pushed == 0 {
290            0.0
291        } else {
292            self.items_dropped as f64 / self.items_pushed as f64
293        }
294    }
295}
296
297#[cfg(test)]
298mod tests {
299    use super::*;
300
301    #[test]
302    fn test_default_config() {
303        let config = ChannelConfig::default();
304        assert_eq!(config.buffer_size, DEFAULT_BUFFER_SIZE);
305        assert_eq!(config.backpressure, BackpressureStrategy::Block);
306        assert_eq!(config.wait_strategy, WaitStrategy::SpinYield);
307        assert!(!config.track_stats);
308    }
309
310    #[test]
311    fn test_config_builder() {
312        let config = ChannelConfig::builder()
313            .buffer_size(1024)
314            .backpressure(BackpressureStrategy::DropOldest)
315            .wait_strategy(WaitStrategy::Park)
316            .track_stats(true)
317            .build();
318
319        assert_eq!(config.buffer_size, 1024);
320        assert_eq!(config.backpressure, BackpressureStrategy::DropOldest);
321        assert_eq!(config.wait_strategy, WaitStrategy::Park);
322        assert!(config.track_stats);
323    }
324
325    #[test]
326    fn test_effective_buffer_size() {
327        let config = ChannelConfig::with_buffer_size(100);
328        assert_eq!(config.effective_buffer_size(), 128); // Next power of 2
329
330        let config = ChannelConfig::with_buffer_size(1);
331        assert_eq!(
332            config.effective_buffer_size(),
333            MIN_BUFFER_SIZE.next_power_of_two()
334        );
335    }
336
337    #[test]
338    fn test_buffer_size_clamping() {
339        let config = ChannelConfig::with_buffer_size(0);
340        assert_eq!(config.buffer_size, MIN_BUFFER_SIZE);
341
342        let config = ChannelConfig::with_buffer_size(usize::MAX);
343        assert_eq!(config.buffer_size, MAX_BUFFER_SIZE);
344    }
345
346    #[test]
347    fn test_source_config() {
348        let config = SourceConfig::with_buffer_size(512);
349        assert_eq!(config.channel.buffer_size, 512);
350        assert!(config.name.is_none());
351
352        let config = SourceConfig::named("my_source");
353        assert_eq!(config.name.as_deref(), Some("my_source"));
354    }
355
356    #[test]
357    fn test_sink_config() {
358        let config = SinkConfig::with_buffer_size(256);
359        assert_eq!(config.channel.buffer_size, 256);
360        assert!(config.name.is_none());
361        assert_eq!(config.max_subscribers, 0);
362
363        let config = SinkConfig::named("my_sink");
364        assert_eq!(config.name.as_deref(), Some("my_sink"));
365    }
366
367    #[test]
368    fn test_channel_stats() {
369        let mut stats = ChannelStats {
370            items_pushed: 100,
371            ..ChannelStats::default()
372        };
373        stats.items_popped = 80;
374        stats.items_dropped = 5;
375
376        assert_eq!(stats.in_flight(), 20);
377        assert!((stats.drop_rate() - 0.05).abs() < f64::EPSILON);
378    }
379
380    #[test]
381    fn test_channel_stats_empty() {
382        let stats = ChannelStats::default();
383        assert_eq!(stats.in_flight(), 0);
384        assert!((stats.drop_rate() - 0.0).abs() < f64::EPSILON);
385    }
386}