Skip to main content

bsql_core/
singleflight.rs

1//! Singleflight request coalescing for query deduplication.
2//!
3//! When multiple async tasks issue the SAME query (same sql_hash + same parameter
4//! bytes) simultaneously, only one actually executes against PostgreSQL. The others
5//! wait for the result and receive a shared copy via a broadcast channel.
6//!
7//! This is opt-in: enabled via `Pool::builder().singleflight(true)`.
8//!
9//! # Key design
10//!
11//! Key = hash of (sql_hash, parameter bytes). We use rapidhash to combine the
12//! sql_hash with a hash of the parameter slice. If a request is already in-flight
13//! with the same key, the caller subscribes to its broadcast channel instead of
14//! executing a new query.
15//!
16//! # Limitations
17//!
18//! - Only coalesces `query_raw` and `query_raw_readonly` (not `execute_raw`).
19//!   Writes must not be coalesced.
20//! - The result is `Arc`-shared, so callers receive the same data (no mutations).
21//! - Large result sets are shared by reference, reducing memory for hot reads.
22
23use std::collections::HashMap;
24use std::sync::{Arc, Mutex};
25
26use tokio::sync::broadcast;
27
28use crate::error::BsqlError;
29
30/// Shared result type broadcast to waiting tasks.
31type SharedResult = Arc<Result<Arc<OwnedResultSnapshot>, BsqlError>>;
32
33/// The in-flight map type: key -> broadcast sender.
34type InFlightMap = Arc<Mutex<HashMap<u64, broadcast::Sender<SharedResult>>>>;
35
36/// A snapshot of query results that can be shared across tasks.
37///
38/// Unlike `OwnedResult`, this does not own an arena — the data has been
39/// copied into owned `Vec<u8>` storage for safe sharing.
40pub struct OwnedResultSnapshot {
41    /// The query result metadata (column offsets, column descriptors).
42    pub result: bsql_driver_postgres::QueryResult,
43    /// Arena data copied into owned storage for sharing.
44    pub arena: bsql_driver_postgres::Arena,
45}
46
47/// Singleflight coalescing layer.
48///
49/// Tracks in-flight queries by key. Concurrent identical queries share results.
50pub struct Singleflight {
51    /// In-flight queries: key -> broadcast sender.
52    /// Uses std::sync::Mutex because the critical section is trivial
53    /// (HashMap insert/remove — no I/O).
54    /// Wrapped in Arc so FlightLeader can hold a back-reference for cleanup on drop.
55    in_flight: InFlightMap,
56}
57
58/// Result of attempting to join a singleflight group.
59pub enum FlightResult {
60    /// This task is the leader — it should execute the query.
61    Leader(FlightLeader),
62    /// Another task is already executing this query — wait for the result.
63    Follower(broadcast::Receiver<SharedResult>),
64}
65
66/// Handle for the leader task that will execute the query and broadcast results.
67///
68/// If the leader is dropped without calling `complete()` (e.g., the task panics),
69/// the `Drop` impl removes the key from the in-flight map so new requests don't
70/// join a dead channel. Followers on the dead channel receive a `RecvError` from
71/// the broadcast receiver, which surfaces as a query error.
72pub struct FlightLeader {
73    key: u64,
74    tx: broadcast::Sender<SharedResult>,
75    /// Back-reference to the in-flight map for cleanup on drop.
76    /// `None` after `complete()` has been called (key already removed).
77    in_flight: Option<InFlightMap>,
78}
79
80impl FlightLeader {
81    /// Broadcast the result to all waiting followers and remove from in-flight map.
82    pub fn complete(mut self, sf: &Singleflight, result: SharedResult) {
83        // Remove from in-flight first so new requests don't join a completed flight
84        sf.in_flight
85            .lock()
86            .unwrap_or_else(|e| e.into_inner())
87            .remove(&self.key);
88        // Mark as completed so Drop doesn't double-remove
89        self.in_flight = None;
90        // Broadcast to followers (ignore send errors — no receivers is fine)
91        let _ = self.tx.send(result);
92    }
93}
94
95impl Drop for FlightLeader {
96    fn drop(&mut self) {
97        // If complete() was not called (e.g., leader task panicked), remove
98        // the key from the in-flight map. This ensures new requests become
99        // leaders instead of joining a dead broadcast channel.
100        if let Some(ref map) = self.in_flight {
101            map.lock()
102                .unwrap_or_else(|e| e.into_inner())
103                .remove(&self.key);
104        }
105    }
106}
107
108impl Singleflight {
109    /// Create a new singleflight coalescing layer.
110    pub fn new() -> Self {
111        Self {
112            in_flight: Arc::new(Mutex::new(HashMap::new())),
113        }
114    }
115
116    /// Try to join an in-flight query group, or become the leader.
117    ///
118    /// `key` should be a hash of (sql_hash, parameter bytes).
119    pub fn try_join(&self, key: u64) -> FlightResult {
120        let mut map = self.in_flight.lock().unwrap_or_else(|e| e.into_inner());
121
122        if let Some(tx) = map.get(&key) {
123            // Another task is already executing — subscribe
124            FlightResult::Follower(tx.subscribe())
125        } else {
126            // We are the leader — create broadcast channel
127            // Capacity 1: only one result will ever be sent
128            let (tx, _) = broadcast::channel(1);
129            map.insert(key, tx.clone());
130            FlightResult::Leader(FlightLeader {
131                key,
132                tx,
133                in_flight: Some(Arc::clone(&self.in_flight)),
134            })
135        }
136    }
137
138    /// Compute a singleflight key from sql_hash and parameter data.
139    ///
140    /// Uses rapidhash to combine the sql_hash with a hash of all parameter
141    /// bytes (including actual encoded values). Two queries with the same SQL
142    /// and same parameter values produce the same key.
143    pub fn compute_key(
144        sql_hash: u64,
145        params: &[&(dyn bsql_driver_postgres::Encode + Sync)],
146    ) -> u64 {
147        use std::hash::{Hash, Hasher};
148        let mut hasher = rapidhash::quality::RapidHasher::default();
149        sql_hash.hash(&mut hasher);
150        let mut scratch = Vec::with_capacity(64);
151        // Hash each parameter's actual encoded bytes (not just type OID)
152        for param in params {
153            if param.is_null() {
154                hasher.write_u8(0xFF); // NULL marker
155            } else {
156                scratch.clear();
157                param.encode_binary(&mut scratch);
158                hasher.write(&scratch);
159            }
160        }
161        hasher.finish()
162    }
163}
164
165impl Default for Singleflight {
166    fn default() -> Self {
167        Self::new()
168    }
169}
170
171#[cfg(test)]
172mod tests {
173    use super::*;
174
175    #[test]
176    fn singleflight_leader_when_empty() {
177        let sf = Singleflight::new();
178        let result = sf.try_join(42);
179        assert!(matches!(result, FlightResult::Leader(_)));
180    }
181
182    #[test]
183    fn singleflight_follower_when_in_flight() {
184        let sf = Singleflight::new();
185        let _leader = sf.try_join(42);
186        let result = sf.try_join(42);
187        assert!(matches!(result, FlightResult::Follower(_)));
188    }
189
190    #[test]
191    fn singleflight_different_keys_both_leaders() {
192        let sf = Singleflight::new();
193        let r1 = sf.try_join(42);
194        let r2 = sf.try_join(43);
195        assert!(matches!(r1, FlightResult::Leader(_)));
196        assert!(matches!(r2, FlightResult::Leader(_)));
197    }
198
199    #[test]
200    fn singleflight_complete_removes_from_map() {
201        let sf = Singleflight::new();
202        let leader = match sf.try_join(42) {
203            FlightResult::Leader(l) => l,
204            _ => panic!("expected leader"),
205        };
206        let err = BsqlError::from(bsql_driver_postgres::DriverError::Pool("test".into()));
207        leader.complete(&sf, Arc::new(Err(err)));
208
209        // After completion, same key should produce a new leader
210        let result = sf.try_join(42);
211        assert!(matches!(result, FlightResult::Leader(_)));
212    }
213
214    #[test]
215    fn compute_key_same_inputs_same_key() {
216        let k1 = Singleflight::compute_key(123, &[]);
217        let k2 = Singleflight::compute_key(123, &[]);
218        assert_eq!(k1, k2);
219    }
220
221    #[test]
222    fn compute_key_different_sql_hash_different_key() {
223        let k1 = Singleflight::compute_key(123, &[]);
224        let k2 = Singleflight::compute_key(456, &[]);
225        assert_ne!(k1, k2);
226    }
227
228    // --- compute_key with actual params ---
229
230    #[test]
231    fn compute_key_same_params_same_key() {
232        let a = 42i32;
233        let b = 42i32;
234        let k1 = Singleflight::compute_key(100, &[&a]);
235        let k2 = Singleflight::compute_key(100, &[&b]);
236        assert_eq!(k1, k2);
237    }
238
239    #[test]
240    fn compute_key_different_params_different_key() {
241        let a = 42i32;
242        let b = 99i32;
243        let k1 = Singleflight::compute_key(100, &[&a]);
244        let k2 = Singleflight::compute_key(100, &[&b]);
245        assert_ne!(k1, k2);
246    }
247
248    #[test]
249    fn compute_key_different_sql_same_params_different_key() {
250        let a = 42i32;
251        let k1 = Singleflight::compute_key(100, &[&a]);
252        let k2 = Singleflight::compute_key(200, &[&a]);
253        assert_ne!(k1, k2);
254    }
255
256    #[test]
257    fn compute_key_null_param_handling() {
258        // Option<i32> = None encodes as NULL
259        let null_val: Option<i32> = None;
260        let some_val: Option<i32> = Some(42);
261        let k1 = Singleflight::compute_key(100, &[&null_val]);
262        let k2 = Singleflight::compute_key(100, &[&some_val]);
263        assert_ne!(k1, k2, "NULL and Some(42) should produce different keys");
264    }
265
266    #[test]
267    fn compute_key_two_nulls_same_key() {
268        let a: Option<i32> = None;
269        let b: Option<i32> = None;
270        let k1 = Singleflight::compute_key(100, &[&a]);
271        let k2 = Singleflight::compute_key(100, &[&b]);
272        assert_eq!(k1, k2);
273    }
274
275    #[test]
276    fn compute_key_multiple_params() {
277        let a = 1i32;
278        let b = "hello";
279        let k1 = Singleflight::compute_key(100, &[&a, &b]);
280        let k2 = Singleflight::compute_key(100, &[&a, &b]);
281        assert_eq!(k1, k2);
282    }
283
284    #[test]
285    fn compute_key_param_order_matters() {
286        let a = 1i32;
287        let b = 2i32;
288        let k1 = Singleflight::compute_key(100, &[&a, &b]);
289        let k2 = Singleflight::compute_key(100, &[&b, &a]);
290        assert_ne!(k1, k2);
291    }
292
293    // --- FlightLeader::complete broadcasts result ---
294
295    #[tokio::test]
296    async fn leader_complete_broadcasts_to_follower() {
297        let sf = Singleflight::new();
298
299        let leader = match sf.try_join(42) {
300            FlightResult::Leader(l) => l,
301            _ => panic!("expected leader"),
302        };
303
304        let mut rx = match sf.try_join(42) {
305            FlightResult::Follower(rx) => rx,
306            _ => panic!("expected follower"),
307        };
308
309        let err = BsqlError::from(bsql_driver_postgres::DriverError::Pool("test".into()));
310        leader.complete(&sf, Arc::new(Err(err)));
311
312        let received = rx.recv().await.unwrap();
313        assert!(received.is_err());
314    }
315
316    // --- Multiple followers receive same result ---
317
318    #[tokio::test]
319    async fn multiple_followers_receive_result() {
320        let sf = Singleflight::new();
321
322        let leader = match sf.try_join(42) {
323            FlightResult::Leader(l) => l,
324            _ => panic!("expected leader"),
325        };
326
327        let mut rx1 = match sf.try_join(42) {
328            FlightResult::Follower(rx) => rx,
329            _ => panic!("expected follower 1"),
330        };
331        let mut rx2 = match sf.try_join(42) {
332            FlightResult::Follower(rx) => rx,
333            _ => panic!("expected follower 2"),
334        };
335
336        let err = BsqlError::from(bsql_driver_postgres::DriverError::Pool("done".into()));
337        leader.complete(&sf, Arc::new(Err(err)));
338
339        let r1 = rx1.recv().await.unwrap();
340        let r2 = rx2.recv().await.unwrap();
341        assert!(r1.is_err());
342        assert!(r2.is_err());
343    }
344
345    // --- Drop leader without completing -> key is removed from map ---
346
347    #[test]
348    fn drop_leader_without_complete_cleans_up_map() {
349        let sf = Singleflight::new();
350
351        let leader = match sf.try_join(42) {
352            FlightResult::Leader(l) => l,
353            _ => panic!("expected leader"),
354        };
355
356        // Drop leader without calling complete (e.g., task panicked).
357        // The Drop impl removes the key from the in-flight map so new
358        // requests don't join a dead broadcast channel.
359        drop(leader);
360
361        // A new try_join for the same key should produce a NEW leader
362        // (the entry was cleaned up on drop).
363        let result = sf.try_join(42);
364        assert!(
365            matches!(result, FlightResult::Leader(_)),
366            "key should be removed from map after leader drop without complete"
367        );
368    }
369
370    // --- Concurrent stress test ---
371
372    #[tokio::test]
373    async fn concurrent_stress_test() {
374        use std::sync::atomic::{AtomicUsize, Ordering};
375        use tokio::task;
376
377        let sf = Arc::new(Singleflight::new());
378        let leader_count = Arc::new(AtomicUsize::new(0));
379        let follower_count = Arc::new(AtomicUsize::new(0));
380
381        let mut handles = Vec::new();
382
383        // 10 tasks, 5 unique keys (2 tasks per key)
384        for i in 0..10 {
385            let sf = Arc::clone(&sf);
386            let leaders = Arc::clone(&leader_count);
387            let followers = Arc::clone(&follower_count);
388            let key = (i % 5) as u64;
389
390            handles.push(task::spawn(async move {
391                match sf.try_join(key) {
392                    FlightResult::Leader(leader) => {
393                        leaders.fetch_add(1, Ordering::Relaxed);
394                        // Complete immediately
395                        let err = BsqlError::from(bsql_driver_postgres::DriverError::Pool(
396                            "stress".into(),
397                        ));
398                        leader.complete(&sf, Arc::new(Err(err)));
399                    }
400                    FlightResult::Follower(_rx) => {
401                        followers.fetch_add(1, Ordering::Relaxed);
402                    }
403                }
404            }));
405        }
406
407        for h in handles {
408            h.await.unwrap();
409        }
410
411        let total = leader_count.load(Ordering::Relaxed) + follower_count.load(Ordering::Relaxed);
412        assert_eq!(total, 10, "all 10 tasks should participate");
413        // At least 5 leaders (one per unique key)
414        assert!(
415            leader_count.load(Ordering::Relaxed) >= 5,
416            "should have at least 5 leaders (one per key)"
417        );
418    }
419
420    // --- Default trait ---
421
422    #[test]
423    fn singleflight_default() {
424        let sf = Singleflight::default();
425        // Should be able to use it
426        let result = sf.try_join(1);
427        assert!(matches!(result, FlightResult::Leader(_)));
428    }
429
430    // --- Send + Sync assertions ---
431
432    fn _assert_send<T: Send>() {}
433    fn _assert_sync<T: Sync>() {}
434
435    #[test]
436    fn singleflight_is_send_and_sync() {
437        _assert_send::<Singleflight>();
438        _assert_sync::<Singleflight>();
439    }
440
441    // --- compute_key with string params ---
442
443    #[test]
444    fn compute_key_string_params() {
445        let a = "hello";
446        let b = "world";
447        let k1 = Singleflight::compute_key(100, &[&a, &b]);
448        let k2 = Singleflight::compute_key(100, &[&a, &b]);
449        assert_eq!(k1, k2);
450    }
451
452    #[test]
453    fn compute_key_empty_params_consistent() {
454        let k1 = Singleflight::compute_key(0, &[]);
455        let k2 = Singleflight::compute_key(0, &[]);
456        assert_eq!(k1, k2);
457    }
458
459    // --- Leader complete with no followers ---
460
461    #[test]
462    fn leader_complete_with_no_followers() {
463        let sf = Singleflight::new();
464        let leader = match sf.try_join(42) {
465            FlightResult::Leader(l) => l,
466            _ => panic!("expected leader"),
467        };
468        // Complete without any followers. Should not panic.
469        let err = BsqlError::from(bsql_driver_postgres::DriverError::Pool("solo".into()));
470        leader.complete(&sf, Arc::new(Err(err)));
471
472        // Key should be removed
473        let result = sf.try_join(42);
474        assert!(matches!(result, FlightResult::Leader(_)));
475    }
476
477    // --- Audit: leader dropped while followers are waiting ---
478
479    #[tokio::test]
480    async fn follower_gets_error_when_leader_dropped_without_complete() {
481        let sf = Singleflight::new();
482
483        let leader = match sf.try_join(42) {
484            FlightResult::Leader(l) => l,
485            _ => panic!("expected leader"),
486        };
487
488        let mut rx = match sf.try_join(42) {
489            FlightResult::Follower(rx) => rx,
490            _ => panic!("expected follower"),
491        };
492
493        // Drop leader without completing (simulates task panic).
494        drop(leader);
495
496        // Follower should get a RecvError (sender dropped).
497        let result = rx.recv().await;
498        assert!(
499            result.is_err(),
500            "follower should get RecvError when leader is dropped without complete"
501        );
502    }
503
504    // --- Audit: leader drop cleans up, new leader can succeed ---
505
506    #[tokio::test]
507    async fn new_leader_succeeds_after_previous_leader_dropped() {
508        let sf = Arc::new(Singleflight::new());
509
510        // First leader drops without completing (simulates panic).
511        let leader1 = match sf.try_join(42) {
512            FlightResult::Leader(l) => l,
513            _ => panic!("expected leader"),
514        };
515        drop(leader1);
516
517        // A new try_join should produce a fresh leader (not a follower on a dead channel).
518        let leader2 = match sf.try_join(42) {
519            FlightResult::Leader(l) => l,
520            _ => panic!("expected new leader after previous leader drop"),
521        };
522
523        let mut rx = match sf.try_join(42) {
524            FlightResult::Follower(rx) => rx,
525            _ => panic!("expected follower for second leader"),
526        };
527
528        let err = BsqlError::from(bsql_driver_postgres::DriverError::Pool("retry".into()));
529        leader2.complete(&sf, Arc::new(Err(err)));
530
531        let received = rx.recv().await.unwrap();
532        assert!(received.is_err());
533    }
534}