Skip to main content

laminar_core/streaming/
config.rs

1//! Configuration types for channels, sources, and sinks.
2
3/// Default buffer size for channels.
4pub const DEFAULT_BUFFER_SIZE: usize = 2048;
5
6/// Minimum buffer size.
7pub const MIN_BUFFER_SIZE: usize = 4;
8
9/// Maximum buffer size.
10pub const MAX_BUFFER_SIZE: usize = 1 << 20; // 1M entries
11
12/// Backpressure strategy when buffer is full.
13///
14/// Stored in source/sink configs and exposed in SQL DDL (`BACKPRESSURE = '...'`).
15/// The streaming channel always blocks on full; this enum is used by higher-level
16/// layers (e.g., catalog snapshot ring) to decide overflow behavior.
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
18pub enum BackpressureStrategy {
19    /// Block until space is available (default).
20    #[default]
21    Block,
22    /// Drop the oldest item to make room.
23    DropOldest,
24    /// Reject the push immediately.
25    Reject,
26}
27
28impl std::str::FromStr for BackpressureStrategy {
29    type Err = String;
30
31    fn from_str(s: &str) -> Result<Self, Self::Err> {
32        match s.to_lowercase().as_str() {
33            "block" | "blocking" => Ok(Self::Block),
34            "drop" | "drop_oldest" | "dropoldest" => Ok(Self::DropOldest),
35            "reject" | "error" => Ok(Self::Reject),
36            _ => Err(format!(
37                "invalid backpressure strategy: '{s}'. Valid values: block, drop_oldest, reject"
38            )),
39        }
40    }
41}
42
43/// Wait strategy — parsed from SQL DDL for forward-compatibility.
44/// Currently unused; crossfire handles its own backoff internally.
45#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
46pub enum WaitStrategy {
47    /// Spin-loop.
48    Spin,
49    /// Spin with yields (default).
50    #[default]
51    SpinYield,
52    /// Park the thread.
53    Park,
54}
55
56impl std::str::FromStr for WaitStrategy {
57    type Err = String;
58
59    fn from_str(s: &str) -> Result<Self, Self::Err> {
60        match s.to_lowercase().as_str() {
61            "spin" => Ok(Self::Spin),
62            "spin_yield" | "spinyield" | "yield" => Ok(Self::SpinYield),
63            "park" | "parking" => Ok(Self::Park),
64            _ => Err(format!(
65                "invalid wait strategy: '{s}'. Valid values: spin, spin_yield, park"
66            )),
67        }
68    }
69}
70
71/// Channel configuration. Only `buffer_size` is used by the channel;
72/// `backpressure` and `wait_strategy` are stored for higher-level consumers.
73#[derive(Debug, Clone)]
74pub struct ChannelConfig {
75    /// Buffer size.
76    pub buffer_size: usize,
77    /// Backpressure strategy (used by catalog/snapshot ring, not the channel).
78    pub backpressure: BackpressureStrategy,
79    /// Wait strategy (reserved for future use).
80    pub wait_strategy: WaitStrategy,
81    /// Whether to track statistics (reserved for future use).
82    pub track_stats: bool,
83}
84
85impl Default for ChannelConfig {
86    fn default() -> Self {
87        Self {
88            buffer_size: DEFAULT_BUFFER_SIZE,
89            backpressure: BackpressureStrategy::Block,
90            wait_strategy: WaitStrategy::SpinYield,
91            track_stats: false,
92        }
93    }
94}
95
96impl ChannelConfig {
97    /// Creates a config with the specified buffer size.
98    #[must_use]
99    pub fn with_buffer_size(buffer_size: usize) -> Self {
100        Self {
101            buffer_size: buffer_size.clamp(MIN_BUFFER_SIZE, MAX_BUFFER_SIZE),
102            ..Default::default()
103        }
104    }
105}
106
107/// Configuration for a Source.
108#[derive(Debug, Clone, Default)]
109pub struct SourceConfig {
110    /// Channel configuration.
111    pub channel: ChannelConfig,
112    /// Name of the source (for debugging/metrics).
113    pub name: Option<String>,
114}
115
116impl SourceConfig {
117    /// Creates a source config with the specified buffer size.
118    #[must_use]
119    pub fn with_buffer_size(buffer_size: usize) -> Self {
120        Self {
121            channel: ChannelConfig::with_buffer_size(buffer_size),
122            name: None,
123        }
124    }
125
126    /// Creates a named source config.
127    #[must_use]
128    pub fn named(name: impl Into<String>) -> Self {
129        Self {
130            channel: ChannelConfig::default(),
131            name: Some(name.into()),
132        }
133    }
134}
135
136#[cfg(test)]
137mod tests {
138    use super::*;
139
140    #[test]
141    fn test_default_config() {
142        let config = ChannelConfig::default();
143        assert_eq!(config.buffer_size, DEFAULT_BUFFER_SIZE);
144    }
145
146    #[test]
147    fn test_buffer_size_clamping() {
148        let config = ChannelConfig::with_buffer_size(0);
149        assert_eq!(config.buffer_size, MIN_BUFFER_SIZE);
150
151        let config = ChannelConfig::with_buffer_size(usize::MAX);
152        assert_eq!(config.buffer_size, MAX_BUFFER_SIZE);
153    }
154
155    #[test]
156    fn test_source_config() {
157        let config = SourceConfig::with_buffer_size(512);
158        assert_eq!(config.channel.buffer_size, 512);
159        assert!(config.name.is_none());
160
161        let config = SourceConfig::named("my_source");
162        assert_eq!(config.name.as_deref(), Some("my_source"));
163    }
164
165    #[test]
166    fn test_backpressure_parse() {
167        assert_eq!(
168            "block".parse::<BackpressureStrategy>().unwrap(),
169            BackpressureStrategy::Block
170        );
171        assert_eq!(
172            "reject".parse::<BackpressureStrategy>().unwrap(),
173            BackpressureStrategy::Reject
174        );
175        assert!("invalid".parse::<BackpressureStrategy>().is_err());
176    }
177}