varpulis_runtime/
backpressure.rs1use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::Arc;
15
16use serde::{Deserialize, Serialize};
17use tokio::sync::mpsc;
18
19use crate::event::SharedEvent;
20
21#[derive(Debug, Clone, Default, Serialize, Deserialize)]
23#[serde(tag = "strategy", rename_all = "snake_case")]
24pub enum WhenFull {
25 #[default]
27 Block,
28 DropNewest,
30 DropOldest,
32 Overflow { secondary_capacity: usize },
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct StageBufferConfig {
39 #[serde(default = "default_capacity")]
41 pub capacity: usize,
42 #[serde(default)]
44 pub when_full: WhenFull,
45}
46
47const fn default_capacity() -> usize {
48 1000
49}
50
51impl Default for StageBufferConfig {
52 fn default() -> Self {
53 Self {
54 capacity: default_capacity(),
55 when_full: WhenFull::Block,
56 }
57 }
58}
59
60#[derive(Debug)]
62pub struct StageBufferMetrics {
63 pub events_received: AtomicU64,
64 pub events_dropped: AtomicU64,
65 pub blocks_total: AtomicU64,
66 pub current_depth: AtomicU64,
67}
68
69impl StageBufferMetrics {
70 pub const fn new() -> Self {
71 Self {
72 events_received: AtomicU64::new(0),
73 events_dropped: AtomicU64::new(0),
74 blocks_total: AtomicU64::new(0),
75 current_depth: AtomicU64::new(0),
76 }
77 }
78}
79
80impl Default for StageBufferMetrics {
81 fn default() -> Self {
82 Self::new()
83 }
84}
85
86#[derive(Debug)]
88pub struct StageBuffer {
89 config: StageBufferConfig,
90 tx: mpsc::Sender<SharedEvent>,
91 metrics: Arc<StageBufferMetrics>,
92}
93
94impl StageBuffer {
95 pub fn new(config: StageBufferConfig) -> (Self, mpsc::Receiver<SharedEvent>) {
99 let (tx, rx) = mpsc::channel(config.capacity);
100 let metrics = Arc::new(StageBufferMetrics::new());
101 (
102 Self {
103 config,
104 tx,
105 metrics,
106 },
107 rx,
108 )
109 }
110
111 pub fn wrap(config: StageBufferConfig, tx: mpsc::Sender<SharedEvent>) -> Self {
113 Self {
114 config,
115 tx,
116 metrics: Arc::new(StageBufferMetrics::new()),
117 }
118 }
119
120 pub async fn send(&self, event: SharedEvent) -> Result<(), BackpressureError> {
122 self.metrics.events_received.fetch_add(1, Ordering::Relaxed);
123
124 match &self.config.when_full {
125 WhenFull::Block => {
126 if self.tx.capacity() == 0 {
127 self.metrics.blocks_total.fetch_add(1, Ordering::Relaxed);
128 }
129 self.tx
130 .send(event)
131 .await
132 .map_err(|_| BackpressureError::ChannelClosed)?;
133 }
134 WhenFull::DropNewest => match self.tx.try_send(event) {
135 Ok(()) => {}
136 Err(mpsc::error::TrySendError::Full(_)) => {
137 self.metrics.events_dropped.fetch_add(1, Ordering::Relaxed);
138 }
139 Err(mpsc::error::TrySendError::Closed(_)) => {
140 return Err(BackpressureError::ChannelClosed);
141 }
142 },
143 WhenFull::DropOldest => {
144 if self.tx.try_send(event.clone()).is_err() {
146 self.metrics.events_dropped.fetch_add(1, Ordering::Relaxed);
148 let _ = self.tx.send(event).await;
150 }
151 }
152 WhenFull::Overflow {
153 secondary_capacity: _,
154 } => {
155 if self.tx.capacity() == 0 {
157 self.metrics.blocks_total.fetch_add(1, Ordering::Relaxed);
158 }
159 self.tx
160 .send(event)
161 .await
162 .map_err(|_| BackpressureError::ChannelClosed)?;
163 }
164 }
165
166 Ok(())
167 }
168
169 pub const fn metrics(&self) -> &Arc<StageBufferMetrics> {
171 &self.metrics
172 }
173
174 pub fn sender(&self) -> mpsc::Sender<SharedEvent> {
176 self.tx.clone()
177 }
178}
179
180#[derive(Debug, thiserror::Error)]
182pub enum BackpressureError {
183 #[error("channel closed")]
184 ChannelClosed,
185}
186
187#[cfg(test)]
188mod tests {
189 use super::*;
190 use crate::event::Event;
191
192 #[tokio::test]
193 async fn test_stage_buffer_block_strategy() {
194 let config = StageBufferConfig {
195 capacity: 10,
196 when_full: WhenFull::Block,
197 };
198 let (buffer, mut rx) = StageBuffer::new(config);
199
200 let event = Arc::new(Event::new("Test"));
201 buffer.send(event).await.unwrap();
202
203 let received = rx.recv().await.unwrap();
204 assert_eq!(received.event_type.as_ref(), "Test");
205 assert_eq!(buffer.metrics().events_received.load(Ordering::Relaxed), 1);
206 }
207
208 #[tokio::test]
209 async fn test_stage_buffer_drop_newest() {
210 let config = StageBufferConfig {
211 capacity: 2,
212 when_full: WhenFull::DropNewest,
213 };
214 let (buffer, _rx) = StageBuffer::new(config);
215
216 buffer.send(Arc::new(Event::new("A"))).await.unwrap();
218 buffer.send(Arc::new(Event::new("B"))).await.unwrap();
219
220 buffer.send(Arc::new(Event::new("C"))).await.unwrap();
222
223 assert_eq!(buffer.metrics().events_received.load(Ordering::Relaxed), 3);
224 assert_eq!(buffer.metrics().events_dropped.load(Ordering::Relaxed), 1);
225 }
226
227 #[tokio::test]
228 async fn test_stage_buffer_default_config() {
229 let config = StageBufferConfig::default();
230 assert_eq!(config.capacity, 1000);
231 assert!(matches!(config.when_full, WhenFull::Block));
232 }
233
234 #[test]
235 fn test_when_full_serialization() {
236 let block = WhenFull::Block;
237 let json = serde_json::to_string(&block).unwrap();
238 assert!(json.contains("block"));
239
240 let overflow = WhenFull::Overflow {
241 secondary_capacity: 500,
242 };
243 let json = serde_json::to_string(&overflow).unwrap();
244 assert!(json.contains("overflow"));
245 assert!(json.contains("500"));
246 }
247
248 #[test]
249 fn test_stage_buffer_config_serialization() {
250 let config = StageBufferConfig {
251 capacity: 2000,
252 when_full: WhenFull::DropNewest,
253 };
254 let json = serde_json::to_string(&config).unwrap();
255 let deserialized: StageBufferConfig = serde_json::from_str(&json).unwrap();
256 assert_eq!(deserialized.capacity, 2000);
257 assert!(matches!(deserialized.when_full, WhenFull::DropNewest));
258 }
259}