guts_node/performance/
coalescing.rs1use futures::future::{BoxFuture, Shared};
8use futures::FutureExt;
9use std::collections::HashMap;
10use std::future::Future;
11use std::hash::Hash;
12use std::sync::atomic::{AtomicU64, Ordering};
13use std::time::{Duration, Instant};
14use tokio::sync::Mutex;
15
16#[derive(Debug, Clone)]
18pub struct CoalescerConfig {
19 pub max_wait: Duration,
21 pub max_in_flight: usize,
23 pub cache_ttl: Duration,
25}
26
27impl Default for CoalescerConfig {
28 fn default() -> Self {
29 Self {
30 max_wait: Duration::from_secs(30),
31 max_in_flight: 10_000,
32 cache_ttl: Duration::from_secs(5),
33 }
34 }
35}
36
37#[derive(Debug, Clone, Default)]
39pub struct CoalescerStats {
40 pub total_requests: u64,
42 pub coalesced_requests: u64,
44 pub new_fetches: u64,
46 pub in_flight: usize,
48}
49
50impl CoalescerStats {
51 pub fn coalescing_ratio(&self) -> f64 {
53 if self.total_requests == 0 {
54 0.0
55 } else {
56 self.coalesced_requests as f64 / self.total_requests as f64
57 }
58 }
59}
60
61struct InFlight<V> {
62 future: Shared<BoxFuture<'static, V>>,
63 created_at: Instant,
64}
65
66pub struct RequestCoalescer<K, V>
68where
69 K: Hash + Eq + Clone,
70 V: Clone + Send + 'static,
71{
72 in_flight: Mutex<HashMap<K, InFlight<V>>>,
73 config: CoalescerConfig,
74 stats: CoalescerStatsInner,
75}
76
77struct CoalescerStatsInner {
78 total_requests: AtomicU64,
79 coalesced_requests: AtomicU64,
80 new_fetches: AtomicU64,
81}
82
83impl<K, V> RequestCoalescer<K, V>
84where
85 K: Hash + Eq + Clone + Send + Sync + 'static,
86 V: Clone + Send + 'static,
87{
88 pub fn new(config: CoalescerConfig) -> Self {
90 Self {
91 in_flight: Mutex::new(HashMap::new()),
92 config,
93 stats: CoalescerStatsInner {
94 total_requests: AtomicU64::new(0),
95 coalesced_requests: AtomicU64::new(0),
96 new_fetches: AtomicU64::new(0),
97 },
98 }
99 }
100
101 pub fn with_defaults() -> Self {
103 Self::new(CoalescerConfig::default())
104 }
105
106 pub async fn get_or_fetch<F, Fut>(&self, key: K, fetch: F) -> V
108 where
109 F: FnOnce() -> Fut,
110 Fut: Future<Output = V> + Send + 'static,
111 {
112 self.stats.total_requests.fetch_add(1, Ordering::Relaxed);
113
114 let existing = {
116 let in_flight = self.in_flight.lock().await;
117 if let Some(entry) = in_flight.get(&key) {
118 if entry.created_at.elapsed() < self.config.max_wait {
119 self.stats
120 .coalesced_requests
121 .fetch_add(1, Ordering::Relaxed);
122 Some(entry.future.clone())
123 } else {
124 None
125 }
126 } else {
127 None
128 }
129 };
130
131 if let Some(future) = existing {
133 return future.await;
134 }
135
136 let at_capacity = {
138 let in_flight = self.in_flight.lock().await;
139 in_flight.len() >= self.config.max_in_flight
140 };
141
142 if at_capacity {
144 self.stats.new_fetches.fetch_add(1, Ordering::Relaxed);
145 return fetch().await;
146 }
147
148 self.stats.new_fetches.fetch_add(1, Ordering::Relaxed);
150
151 let future = fetch().boxed().shared();
152 let entry = InFlight {
153 future: future.clone(),
154 created_at: Instant::now(),
155 };
156
157 {
159 let mut in_flight = self.in_flight.lock().await;
160 in_flight.insert(key.clone(), entry);
161 }
162
163 let result = future.await;
164
165 {
167 let mut in_flight = self.in_flight.lock().await;
168 in_flight.remove(&key);
169 }
170
171 result
172 }
173
174 pub fn stats(&self) -> CoalescerStats {
176 CoalescerStats {
178 total_requests: self.stats.total_requests.load(Ordering::Relaxed),
179 coalesced_requests: self.stats.coalesced_requests.load(Ordering::Relaxed),
180 new_fetches: self.stats.new_fetches.load(Ordering::Relaxed),
181 in_flight: 0, }
183 }
184
185 pub async fn stats_async(&self) -> CoalescerStats {
187 let in_flight = self.in_flight.lock().await;
188 CoalescerStats {
189 total_requests: self.stats.total_requests.load(Ordering::Relaxed),
190 coalesced_requests: self.stats.coalesced_requests.load(Ordering::Relaxed),
191 new_fetches: self.stats.new_fetches.load(Ordering::Relaxed),
192 in_flight: in_flight.len(),
193 }
194 }
195
196 pub async fn clear(&self) {
198 self.in_flight.lock().await.clear();
199 }
200
201 pub async fn cleanup_stale(&self) {
203 let mut in_flight = self.in_flight.lock().await;
204 in_flight.retain(|_, entry| entry.created_at.elapsed() < self.config.max_wait);
205 }
206}
207
208#[cfg(test)]
209mod tests {
210 use super::*;
211 use std::sync::atomic::AtomicU32;
212 use std::sync::Arc;
213 use tokio::time::sleep;
214
215 #[tokio::test]
216 async fn test_coalescing_basic() {
217 let coalescer = RequestCoalescer::<String, u32>::with_defaults();
218 let counter = Arc::new(AtomicU32::new(0));
219
220 let key = "test".to_string();
221
222 let c1 = Arc::clone(&counter);
224 let result = coalescer
225 .get_or_fetch(key.clone(), move || async move {
226 c1.fetch_add(1, Ordering::Relaxed);
227 42
228 })
229 .await;
230
231 assert_eq!(result, 42);
232 assert_eq!(counter.load(Ordering::Relaxed), 1);
233 }
234
235 #[tokio::test]
236 async fn test_concurrent_coalescing() {
237 let coalescer = Arc::new(RequestCoalescer::<String, u32>::with_defaults());
238 let fetch_count = Arc::new(AtomicU32::new(0));
239
240 let key = "shared".to_string();
241 let mut handles = Vec::new();
242
243 for _ in 0..10 {
245 let coalescer = Arc::clone(&coalescer);
246 let key = key.clone();
247 let fetch_count = Arc::clone(&fetch_count);
248
249 handles.push(tokio::spawn(async move {
250 coalescer
251 .get_or_fetch(key, move || {
252 let fc = Arc::clone(&fetch_count);
253 async move {
254 fc.fetch_add(1, Ordering::Relaxed);
255 sleep(Duration::from_millis(100)).await;
256 42
257 }
258 })
259 .await
260 }));
261 }
262
263 let results: Vec<_> = futures::future::join_all(handles)
265 .await
266 .into_iter()
267 .map(|r| r.unwrap())
268 .collect();
269
270 assert!(results.iter().all(|&r| r == 42));
272
273 assert_eq!(fetch_count.load(Ordering::Relaxed), 1);
275
276 let stats = coalescer.stats();
277 assert!(stats.coalesced_requests > 0);
278 }
279
280 #[tokio::test]
281 async fn test_different_keys_not_coalesced() {
282 let coalescer = RequestCoalescer::<String, u32>::with_defaults();
283 let counter = Arc::new(AtomicU32::new(0));
284
285 for i in 0..3 {
286 let key = format!("key-{}", i);
287 let c = Arc::clone(&counter);
288 coalescer
289 .get_or_fetch(key, move || async move {
290 c.fetch_add(1, Ordering::Relaxed);
291 i
292 })
293 .await;
294 }
295
296 assert_eq!(counter.load(Ordering::Relaxed), 3);
298 }
299
300 #[tokio::test]
301 async fn test_stats() {
302 let coalescer = RequestCoalescer::<String, u32>::with_defaults();
303
304 let _ = coalescer
305 .get_or_fetch("a".to_string(), || async { 1 })
306 .await;
307 let _ = coalescer
308 .get_or_fetch("b".to_string(), || async { 2 })
309 .await;
310
311 let stats = coalescer.stats();
312 assert_eq!(stats.total_requests, 2);
313 assert_eq!(stats.new_fetches, 2);
314 }
315}