guts_node/performance/
coalescing.rs

1//! Request coalescing to deduplicate concurrent identical requests.
2//!
3//! When multiple clients request the same resource simultaneously,
4//! this module ensures only one actual request is made, with the
5//! result shared among all waiters.
6
7use futures::future::{BoxFuture, Shared};
8use futures::FutureExt;
9use std::collections::HashMap;
10use std::future::Future;
11use std::hash::Hash;
12use std::sync::atomic::{AtomicU64, Ordering};
13use std::time::{Duration, Instant};
14use tokio::sync::Mutex;
15
16/// Configuration for request coalescing.
17#[derive(Debug, Clone)]
18pub struct CoalescerConfig {
19    /// Maximum time to wait for a coalesced request.
20    pub max_wait: Duration,
21    /// Maximum number of in-flight requests to track.
22    pub max_in_flight: usize,
23    /// TTL for cached results (if caching enabled).
24    pub cache_ttl: Duration,
25}
26
27impl Default for CoalescerConfig {
28    fn default() -> Self {
29        Self {
30            max_wait: Duration::from_secs(30),
31            max_in_flight: 10_000,
32            cache_ttl: Duration::from_secs(5),
33        }
34    }
35}
36
37/// Statistics for request coalescing.
38#[derive(Debug, Clone, Default)]
39pub struct CoalescerStats {
40    /// Total requests received.
41    pub total_requests: u64,
42    /// Requests that were coalesced.
43    pub coalesced_requests: u64,
44    /// Requests that triggered new fetches.
45    pub new_fetches: u64,
46    /// Current number of in-flight requests.
47    pub in_flight: usize,
48}
49
50impl CoalescerStats {
51    /// Returns the coalescing ratio.
52    pub fn coalescing_ratio(&self) -> f64 {
53        if self.total_requests == 0 {
54            0.0
55        } else {
56            self.coalesced_requests as f64 / self.total_requests as f64
57        }
58    }
59}
60
61struct InFlight<V> {
62    future: Shared<BoxFuture<'static, V>>,
63    created_at: Instant,
64}
65
66/// Request coalescer for deduplicating concurrent requests.
67pub struct RequestCoalescer<K, V>
68where
69    K: Hash + Eq + Clone,
70    V: Clone + Send + 'static,
71{
72    in_flight: Mutex<HashMap<K, InFlight<V>>>,
73    config: CoalescerConfig,
74    stats: CoalescerStatsInner,
75}
76
77struct CoalescerStatsInner {
78    total_requests: AtomicU64,
79    coalesced_requests: AtomicU64,
80    new_fetches: AtomicU64,
81}
82
83impl<K, V> RequestCoalescer<K, V>
84where
85    K: Hash + Eq + Clone + Send + Sync + 'static,
86    V: Clone + Send + 'static,
87{
88    /// Creates a new request coalescer.
89    pub fn new(config: CoalescerConfig) -> Self {
90        Self {
91            in_flight: Mutex::new(HashMap::new()),
92            config,
93            stats: CoalescerStatsInner {
94                total_requests: AtomicU64::new(0),
95                coalesced_requests: AtomicU64::new(0),
96                new_fetches: AtomicU64::new(0),
97            },
98        }
99    }
100
101    /// Creates a coalescer with default configuration.
102    pub fn with_defaults() -> Self {
103        Self::new(CoalescerConfig::default())
104    }
105
106    /// Gets a value, coalescing with any in-flight request for the same key.
107    pub async fn get_or_fetch<F, Fut>(&self, key: K, fetch: F) -> V
108    where
109        F: FnOnce() -> Fut,
110        Fut: Future<Output = V> + Send + 'static,
111    {
112        self.stats.total_requests.fetch_add(1, Ordering::Relaxed);
113
114        // Check for existing in-flight request (scope the lock)
115        let existing = {
116            let in_flight = self.in_flight.lock().await;
117            if let Some(entry) = in_flight.get(&key) {
118                if entry.created_at.elapsed() < self.config.max_wait {
119                    self.stats
120                        .coalesced_requests
121                        .fetch_add(1, Ordering::Relaxed);
122                    Some(entry.future.clone())
123                } else {
124                    None
125                }
126            } else {
127                None
128            }
129        };
130
131        // If we found an existing request, await it
132        if let Some(future) = existing {
133            return future.await;
134        }
135
136        // Check capacity (scope the lock)
137        let at_capacity = {
138            let in_flight = self.in_flight.lock().await;
139            in_flight.len() >= self.config.max_in_flight
140        };
141
142        // If at capacity, just execute directly
143        if at_capacity {
144            self.stats.new_fetches.fetch_add(1, Ordering::Relaxed);
145            return fetch().await;
146        }
147
148        // Create new request
149        self.stats.new_fetches.fetch_add(1, Ordering::Relaxed);
150
151        let future = fetch().boxed().shared();
152        let entry = InFlight {
153            future: future.clone(),
154            created_at: Instant::now(),
155        };
156
157        // Insert into map (scope the lock)
158        {
159            let mut in_flight = self.in_flight.lock().await;
160            in_flight.insert(key.clone(), entry);
161        }
162
163        let result = future.await;
164
165        // Clean up (scope the lock)
166        {
167            let mut in_flight = self.in_flight.lock().await;
168            in_flight.remove(&key);
169        }
170
171        result
172    }
173
174    /// Returns statistics.
175    pub fn stats(&self) -> CoalescerStats {
176        // Can't get in_flight count without async, return cached stats
177        CoalescerStats {
178            total_requests: self.stats.total_requests.load(Ordering::Relaxed),
179            coalesced_requests: self.stats.coalesced_requests.load(Ordering::Relaxed),
180            new_fetches: self.stats.new_fetches.load(Ordering::Relaxed),
181            in_flight: 0, // Can't get this synchronously
182        }
183    }
184
185    /// Returns statistics with in-flight count.
186    pub async fn stats_async(&self) -> CoalescerStats {
187        let in_flight = self.in_flight.lock().await;
188        CoalescerStats {
189            total_requests: self.stats.total_requests.load(Ordering::Relaxed),
190            coalesced_requests: self.stats.coalesced_requests.load(Ordering::Relaxed),
191            new_fetches: self.stats.new_fetches.load(Ordering::Relaxed),
192            in_flight: in_flight.len(),
193        }
194    }
195
196    /// Clears all in-flight requests.
197    pub async fn clear(&self) {
198        self.in_flight.lock().await.clear();
199    }
200
201    /// Removes stale entries (older than max_wait).
202    pub async fn cleanup_stale(&self) {
203        let mut in_flight = self.in_flight.lock().await;
204        in_flight.retain(|_, entry| entry.created_at.elapsed() < self.config.max_wait);
205    }
206}
207
208#[cfg(test)]
209mod tests {
210    use super::*;
211    use std::sync::atomic::AtomicU32;
212    use std::sync::Arc;
213    use tokio::time::sleep;
214
215    #[tokio::test]
216    async fn test_coalescing_basic() {
217        let coalescer = RequestCoalescer::<String, u32>::with_defaults();
218        let counter = Arc::new(AtomicU32::new(0));
219
220        let key = "test".to_string();
221
222        // First request
223        let c1 = Arc::clone(&counter);
224        let result = coalescer
225            .get_or_fetch(key.clone(), move || async move {
226                c1.fetch_add(1, Ordering::Relaxed);
227                42
228            })
229            .await;
230
231        assert_eq!(result, 42);
232        assert_eq!(counter.load(Ordering::Relaxed), 1);
233    }
234
235    #[tokio::test]
236    async fn test_concurrent_coalescing() {
237        let coalescer = Arc::new(RequestCoalescer::<String, u32>::with_defaults());
238        let fetch_count = Arc::new(AtomicU32::new(0));
239
240        let key = "shared".to_string();
241        let mut handles = Vec::new();
242
243        // Spawn 10 concurrent requests for the same key
244        for _ in 0..10 {
245            let coalescer = Arc::clone(&coalescer);
246            let key = key.clone();
247            let fetch_count = Arc::clone(&fetch_count);
248
249            handles.push(tokio::spawn(async move {
250                coalescer
251                    .get_or_fetch(key, move || {
252                        let fc = Arc::clone(&fetch_count);
253                        async move {
254                            fc.fetch_add(1, Ordering::Relaxed);
255                            sleep(Duration::from_millis(100)).await;
256                            42
257                        }
258                    })
259                    .await
260            }));
261        }
262
263        // Wait for all
264        let results: Vec<_> = futures::future::join_all(handles)
265            .await
266            .into_iter()
267            .map(|r| r.unwrap())
268            .collect();
269
270        // All should get the same result
271        assert!(results.iter().all(|&r| r == 42));
272
273        // Only one fetch should have been made
274        assert_eq!(fetch_count.load(Ordering::Relaxed), 1);
275
276        let stats = coalescer.stats();
277        assert!(stats.coalesced_requests > 0);
278    }
279
280    #[tokio::test]
281    async fn test_different_keys_not_coalesced() {
282        let coalescer = RequestCoalescer::<String, u32>::with_defaults();
283        let counter = Arc::new(AtomicU32::new(0));
284
285        for i in 0..3 {
286            let key = format!("key-{}", i);
287            let c = Arc::clone(&counter);
288            coalescer
289                .get_or_fetch(key, move || async move {
290                    c.fetch_add(1, Ordering::Relaxed);
291                    i
292                })
293                .await;
294        }
295
296        // Each key should trigger a fetch
297        assert_eq!(counter.load(Ordering::Relaxed), 3);
298    }
299
300    #[tokio::test]
301    async fn test_stats() {
302        let coalescer = RequestCoalescer::<String, u32>::with_defaults();
303
304        let _ = coalescer
305            .get_or_fetch("a".to_string(), || async { 1 })
306            .await;
307        let _ = coalescer
308            .get_or_fetch("b".to_string(), || async { 2 })
309            .await;
310
311        let stats = coalescer.stats();
312        assert_eq!(stats.total_requests, 2);
313        assert_eq!(stats.new_fetches, 2);
314    }
315}