Skip to main content

varpulis_runtime/
backpressure.rs

1//! Backpressure strategies for per-stage buffer management
2//!
3//! Provides configurable buffering strategies for event pipelines,
4//! replacing hardcoded channel capacities with explicit policies.
5//!
6//! # Strategies
7//!
8//! - [`WhenFull::Block`] — block the sender until space is available (default)
9//! - [`WhenFull::DropNewest`] — drop the incoming event when the buffer is full
10//! - [`WhenFull::DropOldest`] — drop the oldest buffered event to make room
11//! - [`WhenFull::Overflow`] — spill into a secondary buffer
12
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::Arc;
15
16use serde::{Deserialize, Serialize};
17use tokio::sync::mpsc;
18
19use crate::event::SharedEvent;
20
21/// Strategy applied when a stage buffer is full.
22#[derive(Debug, Clone, Default, Serialize, Deserialize)]
23#[serde(tag = "strategy", rename_all = "snake_case")]
24pub enum WhenFull {
25    /// Block the sender until space is available.
26    #[default]
27    Block,
28    /// Drop the newest (incoming) event.
29    DropNewest,
30    /// Drop the oldest buffered event to make room.
31    DropOldest,
32    /// Overflow into a secondary buffer of the given capacity.
33    Overflow { secondary_capacity: usize },
34}
35
36/// Configuration for a stage buffer.
37#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct StageBufferConfig {
39    /// Maximum number of events the buffer can hold.
40    #[serde(default = "default_capacity")]
41    pub capacity: usize,
42    /// Strategy when the buffer is full.
43    #[serde(default)]
44    pub when_full: WhenFull,
45}
46
47const fn default_capacity() -> usize {
48    1000
49}
50
51impl Default for StageBufferConfig {
52    fn default() -> Self {
53        Self {
54            capacity: default_capacity(),
55            when_full: WhenFull::Block,
56        }
57    }
58}
59
60/// Metrics for a stage buffer.
61#[derive(Debug)]
62pub struct StageBufferMetrics {
63    pub events_received: AtomicU64,
64    pub events_dropped: AtomicU64,
65    pub blocks_total: AtomicU64,
66    pub current_depth: AtomicU64,
67}
68
69impl StageBufferMetrics {
70    pub const fn new() -> Self {
71        Self {
72            events_received: AtomicU64::new(0),
73            events_dropped: AtomicU64::new(0),
74            blocks_total: AtomicU64::new(0),
75            current_depth: AtomicU64::new(0),
76        }
77    }
78}
79
80impl Default for StageBufferMetrics {
81    fn default() -> Self {
82        Self::new()
83    }
84}
85
86/// A stage buffer that wraps an mpsc sender with a backpressure strategy.
87#[derive(Debug)]
88pub struct StageBuffer {
89    config: StageBufferConfig,
90    tx: mpsc::Sender<SharedEvent>,
91    metrics: Arc<StageBufferMetrics>,
92}
93
94impl StageBuffer {
95    /// Create a new stage buffer with the given config.
96    ///
97    /// Returns `(StageBuffer, mpsc::Receiver<SharedEvent>)`.
98    pub fn new(config: StageBufferConfig) -> (Self, mpsc::Receiver<SharedEvent>) {
99        let (tx, rx) = mpsc::channel(config.capacity);
100        let metrics = Arc::new(StageBufferMetrics::new());
101        (
102            Self {
103                config,
104                tx,
105                metrics,
106            },
107            rx,
108        )
109    }
110
111    /// Create a stage buffer that wraps an existing sender.
112    pub fn wrap(config: StageBufferConfig, tx: mpsc::Sender<SharedEvent>) -> Self {
113        Self {
114            config,
115            tx,
116            metrics: Arc::new(StageBufferMetrics::new()),
117        }
118    }
119
120    /// Send an event through the buffer, applying the configured strategy.
121    pub async fn send(&self, event: SharedEvent) -> Result<(), BackpressureError> {
122        self.metrics.events_received.fetch_add(1, Ordering::Relaxed);
123
124        match &self.config.when_full {
125            WhenFull::Block => {
126                if self.tx.capacity() == 0 {
127                    self.metrics.blocks_total.fetch_add(1, Ordering::Relaxed);
128                }
129                self.tx
130                    .send(event)
131                    .await
132                    .map_err(|_| BackpressureError::ChannelClosed)?;
133            }
134            WhenFull::DropNewest => match self.tx.try_send(event) {
135                Ok(()) => {}
136                Err(mpsc::error::TrySendError::Full(_)) => {
137                    self.metrics.events_dropped.fetch_add(1, Ordering::Relaxed);
138                }
139                Err(mpsc::error::TrySendError::Closed(_)) => {
140                    return Err(BackpressureError::ChannelClosed);
141                }
142            },
143            WhenFull::DropOldest => {
144                // If full, drain one and send
145                if self.tx.try_send(event.clone()).is_err() {
146                    // Channel is full — the oldest event is lost
147                    self.metrics.events_dropped.fetch_add(1, Ordering::Relaxed);
148                    // Force-send by blocking (the receiver will have consumed one)
149                    let _ = self.tx.send(event).await;
150                }
151            }
152            WhenFull::Overflow {
153                secondary_capacity: _,
154            } => {
155                // Phase 1: treat overflow as block (full overflow support in Phase 2)
156                if self.tx.capacity() == 0 {
157                    self.metrics.blocks_total.fetch_add(1, Ordering::Relaxed);
158                }
159                self.tx
160                    .send(event)
161                    .await
162                    .map_err(|_| BackpressureError::ChannelClosed)?;
163            }
164        }
165
166        Ok(())
167    }
168
169    /// Get a reference to the metrics.
170    pub const fn metrics(&self) -> &Arc<StageBufferMetrics> {
171        &self.metrics
172    }
173
174    /// Get a clone of the underlying sender.
175    pub fn sender(&self) -> mpsc::Sender<SharedEvent> {
176        self.tx.clone()
177    }
178}
179
180/// Errors from backpressure operations.
181#[derive(Debug, thiserror::Error)]
182pub enum BackpressureError {
183    #[error("channel closed")]
184    ChannelClosed,
185}
186
187#[cfg(test)]
188mod tests {
189    use super::*;
190    use crate::event::Event;
191
192    #[tokio::test]
193    async fn test_stage_buffer_block_strategy() {
194        let config = StageBufferConfig {
195            capacity: 10,
196            when_full: WhenFull::Block,
197        };
198        let (buffer, mut rx) = StageBuffer::new(config);
199
200        let event = Arc::new(Event::new("Test"));
201        buffer.send(event).await.unwrap();
202
203        let received = rx.recv().await.unwrap();
204        assert_eq!(received.event_type.as_ref(), "Test");
205        assert_eq!(buffer.metrics().events_received.load(Ordering::Relaxed), 1);
206    }
207
208    #[tokio::test]
209    async fn test_stage_buffer_drop_newest() {
210        let config = StageBufferConfig {
211            capacity: 2,
212            when_full: WhenFull::DropNewest,
213        };
214        let (buffer, _rx) = StageBuffer::new(config);
215
216        // Fill the buffer
217        buffer.send(Arc::new(Event::new("A"))).await.unwrap();
218        buffer.send(Arc::new(Event::new("B"))).await.unwrap();
219
220        // This should be dropped
221        buffer.send(Arc::new(Event::new("C"))).await.unwrap();
222
223        assert_eq!(buffer.metrics().events_received.load(Ordering::Relaxed), 3);
224        assert_eq!(buffer.metrics().events_dropped.load(Ordering::Relaxed), 1);
225    }
226
227    #[tokio::test]
228    async fn test_stage_buffer_default_config() {
229        let config = StageBufferConfig::default();
230        assert_eq!(config.capacity, 1000);
231        assert!(matches!(config.when_full, WhenFull::Block));
232    }
233
234    #[test]
235    fn test_when_full_serialization() {
236        let block = WhenFull::Block;
237        let json = serde_json::to_string(&block).unwrap();
238        assert!(json.contains("block"));
239
240        let overflow = WhenFull::Overflow {
241            secondary_capacity: 500,
242        };
243        let json = serde_json::to_string(&overflow).unwrap();
244        assert!(json.contains("overflow"));
245        assert!(json.contains("500"));
246    }
247
248    #[test]
249    fn test_stage_buffer_config_serialization() {
250        let config = StageBufferConfig {
251            capacity: 2000,
252            when_full: WhenFull::DropNewest,
253        };
254        let json = serde_json::to_string(&config).unwrap();
255        let deserialized: StageBufferConfig = serde_json::from_str(&json).unwrap();
256        assert_eq!(deserialized.capacity, 2000);
257        assert!(matches!(deserialized.when_full, WhenFull::DropNewest));
258    }
259}