Skip to main content

bsql_core/
singleflight.rs

1//! Singleflight request coalescing for query deduplication.
2//!
3//! When multiple threads issue the SAME query (same sql_hash + same parameter
4//! bytes) simultaneously, only one actually executes against PostgreSQL. The others
5//! wait for the result and receive a shared copy via a condvar.
6//!
7//! This is opt-in: enabled via `Pool::builder().singleflight(true)`.
8//!
9//! # Key design
10//!
11//! Key = hash of (sql_hash, parameter bytes). We use rapidhash to combine the
12//! sql_hash with a hash of the parameter slice. If a request is already in-flight
13//! with the same key, the caller waits on its condvar instead of executing a new
14//! query.
15//!
16//! # Limitations
17//!
18//! - Only coalesces `query_raw` and `query_raw_readonly` (not `execute_raw`).
19//!   Writes must not be coalesced.
20//! - The result is `Arc`-shared, so callers receive the same data (no mutations).
21//! - Large result sets are shared by reference, reducing memory for hot reads.
22
23use std::collections::HashMap;
24use std::sync::{Arc, Condvar, Mutex};
25use std::time::Duration;
26
27use crate::error::BsqlError;
28
29/// Shared result type sent to waiting threads.
30type SharedResult = Arc<Result<Arc<OwnedResultSnapshot>, BsqlError>>;
31
32/// State shared between a leader and its followers via condvar.
33pub struct FlightState {
34    result: Mutex<Option<SharedResult>>,
35    condvar: Condvar,
36    /// Set to true when leader drops without completing.
37    /// Followers check this to break out of the wait loop.
38    cancelled: std::sync::atomic::AtomicBool,
39}
40
41/// The in-flight map type: key -> flight state.
42type InFlightMap = Arc<Mutex<HashMap<u64, Arc<FlightState>>>>;
43
44/// A snapshot of query results that can be shared across threads.
45///
46/// Unlike `OwnedResult`, this does not own an arena — the data has been
47/// copied into owned `Vec<u8>` storage for safe sharing.
48pub struct OwnedResultSnapshot {
49    /// The query result metadata (column offsets, column descriptors).
50    pub result: bsql_driver_postgres::QueryResult,
51    /// Arena data copied into owned storage for sharing.
52    pub arena: bsql_driver_postgres::Arena,
53}
54
55/// Singleflight coalescing layer.
56///
57/// Tracks in-flight queries by key. Concurrent identical queries share results.
58pub struct Singleflight {
59    /// In-flight queries: key -> flight state.
60    /// Uses std::sync::Mutex because the critical section is trivial
61    /// (HashMap insert/remove — no I/O).
62    /// Wrapped in Arc so FlightLeader can hold a back-reference for cleanup on drop.
63    in_flight: InFlightMap,
64}
65
66/// Result of attempting to join a singleflight group.
67pub enum FlightResult {
68    /// This thread is the leader — it should execute the query.
69    Leader(FlightLeader),
70    /// Another thread is already executing this query — wait for the result.
71    Follower(Arc<FlightState>),
72}
73
74/// Handle for the leader thread that will execute the query and notify followers.
75///
76/// If the leader is dropped without calling `complete()` (e.g., the thread panics),
77/// the `Drop` impl removes the key from the in-flight map so new requests don't
78/// wait on a dead condvar. Followers waiting on the condvar are woken and will
79/// find `None` in the result, which surfaces as a query error.
80pub struct FlightLeader {
81    key: u64,
82    state: Arc<FlightState>,
83    /// Back-reference to the in-flight map for cleanup on drop.
84    /// `None` after `complete()` has been called (key already removed).
85    in_flight: Option<InFlightMap>,
86}
87
88impl FlightLeader {
89    /// Send the result to all waiting followers and remove from in-flight map.
90    pub fn complete(mut self, sf: &Singleflight, result: SharedResult) {
91        // Remove from in-flight first so new requests don't join a completed flight
92        sf.in_flight
93            .lock()
94            .unwrap_or_else(|e| e.into_inner())
95            .remove(&self.key);
96        // Mark as completed so Drop doesn't double-remove
97        self.in_flight = None;
98        // Store the result and notify all waiting followers
99        *self.state.result.lock().unwrap_or_else(|e| e.into_inner()) = Some(result);
100        self.state.condvar.notify_all();
101    }
102}
103
104impl Drop for FlightLeader {
105    fn drop(&mut self) {
106        // If complete() was not called (e.g., leader thread panicked), remove
107        // the key from the in-flight map and signal cancellation.
108        if let Some(ref map) = self.in_flight {
109            map.lock()
110                .unwrap_or_else(|e| e.into_inner())
111                .remove(&self.key);
112            // Signal cancellation so followers break out of wait loop
113            self.state
114                .cancelled
115                .store(true, std::sync::atomic::Ordering::Release);
116            self.state.condvar.notify_all();
117        }
118    }
119}
120
121impl Singleflight {
122    /// Create a new singleflight coalescing layer.
123    pub fn new() -> Self {
124        Self {
125            in_flight: Arc::new(Mutex::new(HashMap::new())),
126        }
127    }
128
129    /// Try to join an in-flight query group, or become the leader.
130    ///
131    /// `key` should be a hash of (sql_hash, parameter bytes).
132    pub fn try_join(&self, key: u64) -> FlightResult {
133        let mut map = self.in_flight.lock().unwrap_or_else(|e| e.into_inner());
134
135        if let Some(state) = map.get(&key) {
136            // Another thread is already executing — wait on condvar
137            FlightResult::Follower(Arc::clone(state))
138        } else {
139            // We are the leader — create flight state
140            let state = Arc::new(FlightState {
141                result: Mutex::new(None),
142                condvar: Condvar::new(),
143                cancelled: std::sync::atomic::AtomicBool::new(false),
144            });
145            map.insert(key, Arc::clone(&state));
146            FlightResult::Leader(FlightLeader {
147                key,
148                state,
149                in_flight: Some(Arc::clone(&self.in_flight)),
150            })
151        }
152    }
153
154    /// Wait for a flight result as a follower.
155    ///
156    /// Blocks until the leader calls `complete()`, is dropped, or the 30-second
157    /// timeout expires. Returns `None` if the leader was dropped without
158    /// completing (e.g., panic) or if the wait timed out.
159    pub fn wait_for_result(state: &FlightState) -> Option<SharedResult> {
160        const SINGLEFLIGHT_TIMEOUT: Duration = Duration::from_secs(30);
161        let mut guard = state.result.lock().unwrap_or_else(|e| e.into_inner());
162        while guard.is_none() {
163            // Check if leader dropped without completing (panic, error, etc.)
164            if state.cancelled.load(std::sync::atomic::Ordering::Acquire) {
165                return None;
166            }
167            let (new_guard, wait_result) = state
168                .condvar
169                .wait_timeout(guard, SINGLEFLIGHT_TIMEOUT)
170                .unwrap_or_else(|e| e.into_inner());
171            guard = new_guard;
172            // If the wait timed out and there's still no result, give up.
173            if wait_result.timed_out() && guard.is_none() {
174                return None;
175            }
176        }
177        guard.clone()
178    }
179
180    /// Compute a singleflight key from sql_hash and parameter data.
181    ///
182    /// Uses rapidhash to combine the sql_hash with a hash of all parameter
183    /// bytes (including actual encoded values). Two queries with the same SQL
184    /// and same parameter values produce the same key.
185    pub fn compute_key(
186        sql_hash: u64,
187        params: &[&(dyn bsql_driver_postgres::Encode + Sync)],
188    ) -> u64 {
189        use std::hash::{Hash, Hasher};
190        let mut hasher = rapidhash::quality::RapidHasher::default();
191        sql_hash.hash(&mut hasher);
192        let mut scratch = Vec::with_capacity(64);
193        // Hash each parameter's actual encoded bytes (not just type OID)
194        for param in params {
195            if param.is_null() {
196                hasher.write_u8(0xFF); // NULL marker
197            } else {
198                scratch.clear();
199                param.encode_binary(&mut scratch);
200                hasher.write(&scratch);
201            }
202        }
203        hasher.finish()
204    }
205}
206
207impl Default for Singleflight {
208    fn default() -> Self {
209        Self::new()
210    }
211}
212
213#[cfg(test)]
214mod tests {
215    use super::*;
216
217    #[test]
218    fn singleflight_leader_when_empty() {
219        let sf = Singleflight::new();
220        let result = sf.try_join(42);
221        assert!(matches!(result, FlightResult::Leader(_)));
222    }
223
224    #[test]
225    fn singleflight_follower_when_in_flight() {
226        let sf = Singleflight::new();
227        let _leader = sf.try_join(42);
228        let result = sf.try_join(42);
229        assert!(matches!(result, FlightResult::Follower(_)));
230    }
231
232    #[test]
233    fn singleflight_different_keys_both_leaders() {
234        let sf = Singleflight::new();
235        let r1 = sf.try_join(42);
236        let r2 = sf.try_join(43);
237        assert!(matches!(r1, FlightResult::Leader(_)));
238        assert!(matches!(r2, FlightResult::Leader(_)));
239    }
240
241    #[test]
242    fn singleflight_complete_removes_from_map() {
243        let sf = Singleflight::new();
244        let leader = match sf.try_join(42) {
245            FlightResult::Leader(l) => l,
246            _ => panic!("expected leader"),
247        };
248        let err = BsqlError::from(bsql_driver_postgres::DriverError::Pool("test".into()));
249        leader.complete(&sf, Arc::new(Err(err)));
250
251        // After completion, same key should produce a new leader
252        let result = sf.try_join(42);
253        assert!(matches!(result, FlightResult::Leader(_)));
254    }
255
256    #[test]
257    fn compute_key_same_inputs_same_key() {
258        let k1 = Singleflight::compute_key(123, &[]);
259        let k2 = Singleflight::compute_key(123, &[]);
260        assert_eq!(k1, k2);
261    }
262
263    #[test]
264    fn compute_key_different_sql_hash_different_key() {
265        let k1 = Singleflight::compute_key(123, &[]);
266        let k2 = Singleflight::compute_key(456, &[]);
267        assert_ne!(k1, k2);
268    }
269
270    // --- compute_key with actual params ---
271
272    #[test]
273    fn compute_key_same_params_same_key() {
274        let a = 42i32;
275        let b = 42i32;
276        let k1 = Singleflight::compute_key(100, &[&a]);
277        let k2 = Singleflight::compute_key(100, &[&b]);
278        assert_eq!(k1, k2);
279    }
280
281    #[test]
282    fn compute_key_different_params_different_key() {
283        let a = 42i32;
284        let b = 99i32;
285        let k1 = Singleflight::compute_key(100, &[&a]);
286        let k2 = Singleflight::compute_key(100, &[&b]);
287        assert_ne!(k1, k2);
288    }
289
290    #[test]
291    fn compute_key_different_sql_same_params_different_key() {
292        let a = 42i32;
293        let k1 = Singleflight::compute_key(100, &[&a]);
294        let k2 = Singleflight::compute_key(200, &[&a]);
295        assert_ne!(k1, k2);
296    }
297
298    #[test]
299    fn compute_key_null_param_handling() {
300        // Option<i32> = None encodes as NULL
301        let null_val: Option<i32> = None;
302        let some_val: Option<i32> = Some(42);
303        let k1 = Singleflight::compute_key(100, &[&null_val]);
304        let k2 = Singleflight::compute_key(100, &[&some_val]);
305        assert_ne!(k1, k2, "NULL and Some(42) should produce different keys");
306    }
307
308    #[test]
309    fn compute_key_two_nulls_same_key() {
310        let a: Option<i32> = None;
311        let b: Option<i32> = None;
312        let k1 = Singleflight::compute_key(100, &[&a]);
313        let k2 = Singleflight::compute_key(100, &[&b]);
314        assert_eq!(k1, k2);
315    }
316
317    #[test]
318    fn compute_key_multiple_params() {
319        let a = 1i32;
320        let b = "hello";
321        let k1 = Singleflight::compute_key(100, &[&a, &b]);
322        let k2 = Singleflight::compute_key(100, &[&a, &b]);
323        assert_eq!(k1, k2);
324    }
325
326    #[test]
327    fn compute_key_param_order_matters() {
328        let a = 1i32;
329        let b = 2i32;
330        let k1 = Singleflight::compute_key(100, &[&a, &b]);
331        let k2 = Singleflight::compute_key(100, &[&b, &a]);
332        assert_ne!(k1, k2);
333    }
334
335    // --- FlightLeader::complete notifies follower ---
336
337    #[test]
338    fn leader_complete_notifies_follower() {
339        let sf = Arc::new(Singleflight::new());
340
341        let leader = match sf.try_join(42) {
342            FlightResult::Leader(l) => l,
343            _ => panic!("expected leader"),
344        };
345
346        let follower_state = match sf.try_join(42) {
347            FlightResult::Follower(state) => state,
348            _ => panic!("expected follower"),
349        };
350
351        let handle = std::thread::spawn(move || Singleflight::wait_for_result(&follower_state));
352
353        let err = BsqlError::from(bsql_driver_postgres::DriverError::Pool("test".into()));
354        leader.complete(&sf, Arc::new(Err(err)));
355
356        let received = handle.join().unwrap();
357        assert!(received.is_some());
358        assert!(received.unwrap().is_err());
359    }
360
361    // --- Multiple followers receive same result ---
362
363    #[test]
364    fn multiple_followers_receive_result() {
365        let sf = Arc::new(Singleflight::new());
366
367        let leader = match sf.try_join(42) {
368            FlightResult::Leader(l) => l,
369            _ => panic!("expected leader"),
370        };
371
372        let state1 = match sf.try_join(42) {
373            FlightResult::Follower(s) => s,
374            _ => panic!("expected follower 1"),
375        };
376        let state2 = match sf.try_join(42) {
377            FlightResult::Follower(s) => s,
378            _ => panic!("expected follower 2"),
379        };
380
381        let h1 = std::thread::spawn(move || Singleflight::wait_for_result(&state1));
382        let h2 = std::thread::spawn(move || Singleflight::wait_for_result(&state2));
383
384        let err = BsqlError::from(bsql_driver_postgres::DriverError::Pool("done".into()));
385        leader.complete(&sf, Arc::new(Err(err)));
386
387        let r1 = h1.join().unwrap();
388        let r2 = h2.join().unwrap();
389        assert!(r1.is_some());
390        assert!(r1.unwrap().is_err());
391        assert!(r2.is_some());
392        assert!(r2.unwrap().is_err());
393    }
394
395    // --- Drop leader without completing -> key is removed from map ---
396
397    #[test]
398    fn drop_leader_without_complete_cleans_up_map() {
399        let sf = Singleflight::new();
400
401        let leader = match sf.try_join(42) {
402            FlightResult::Leader(l) => l,
403            _ => panic!("expected leader"),
404        };
405
406        // Drop leader without calling complete (e.g., thread panicked).
407        // The Drop impl removes the key from the in-flight map so new
408        // requests don't wait on a dead condvar.
409        drop(leader);
410
411        // A new try_join for the same key should produce a NEW leader
412        // (the entry was cleaned up on drop).
413        let result = sf.try_join(42);
414        assert!(
415            matches!(result, FlightResult::Leader(_)),
416            "key should be removed from map after leader drop without complete"
417        );
418    }
419
420    // --- Concurrent stress test ---
421
422    #[test]
423    fn concurrent_stress_test() {
424        use std::sync::atomic::{AtomicUsize, Ordering};
425
426        let sf = Arc::new(Singleflight::new());
427        let leader_count = Arc::new(AtomicUsize::new(0));
428        let follower_count = Arc::new(AtomicUsize::new(0));
429
430        let mut handles = Vec::new();
431
432        // 10 threads, 5 unique keys (2 threads per key)
433        for i in 0..10 {
434            let sf = Arc::clone(&sf);
435            let leaders = Arc::clone(&leader_count);
436            let followers = Arc::clone(&follower_count);
437            let key = (i % 5) as u64;
438
439            handles.push(std::thread::spawn(move || {
440                match sf.try_join(key) {
441                    FlightResult::Leader(leader) => {
442                        leaders.fetch_add(1, Ordering::Relaxed);
443                        // Complete immediately
444                        let err = BsqlError::from(bsql_driver_postgres::DriverError::Pool(
445                            "stress".into(),
446                        ));
447                        leader.complete(&sf, Arc::new(Err(err)));
448                    }
449                    FlightResult::Follower(_state) => {
450                        followers.fetch_add(1, Ordering::Relaxed);
451                    }
452                }
453            }));
454        }
455
456        for h in handles {
457            h.join().unwrap();
458        }
459
460        let total = leader_count.load(Ordering::Relaxed) + follower_count.load(Ordering::Relaxed);
461        assert_eq!(total, 10, "all 10 threads should participate");
462        // At least 5 leaders (one per unique key)
463        assert!(
464            leader_count.load(Ordering::Relaxed) >= 5,
465            "should have at least 5 leaders (one per key)"
466        );
467    }
468
469    // --- Default trait ---
470
471    #[test]
472    fn singleflight_default() {
473        let sf = Singleflight::default();
474        // Should be able to use it
475        let result = sf.try_join(1);
476        assert!(matches!(result, FlightResult::Leader(_)));
477    }
478
479    // --- Send + Sync assertions ---
480
481    fn _assert_send<T: Send>() {}
482    fn _assert_sync<T: Sync>() {}
483
484    #[test]
485    fn singleflight_is_send_and_sync() {
486        _assert_send::<Singleflight>();
487        _assert_sync::<Singleflight>();
488    }
489
490    // --- compute_key with string params ---
491
492    #[test]
493    fn compute_key_string_params() {
494        let a = "hello";
495        let b = "world";
496        let k1 = Singleflight::compute_key(100, &[&a, &b]);
497        let k2 = Singleflight::compute_key(100, &[&a, &b]);
498        assert_eq!(k1, k2);
499    }
500
501    #[test]
502    fn compute_key_empty_params_consistent() {
503        let k1 = Singleflight::compute_key(0, &[]);
504        let k2 = Singleflight::compute_key(0, &[]);
505        assert_eq!(k1, k2);
506    }
507
508    // --- Leader complete with no followers ---
509
510    #[test]
511    fn leader_complete_with_no_followers() {
512        let sf = Singleflight::new();
513        let leader = match sf.try_join(42) {
514            FlightResult::Leader(l) => l,
515            _ => panic!("expected leader"),
516        };
517        // Complete without any followers. Should not panic.
518        let err = BsqlError::from(bsql_driver_postgres::DriverError::Pool("solo".into()));
519        leader.complete(&sf, Arc::new(Err(err)));
520
521        // Key should be removed
522        let result = sf.try_join(42);
523        assert!(matches!(result, FlightResult::Leader(_)));
524    }
525
526    // --- Audit: leader dropped while followers are waiting ---
527
528    #[test]
529    fn follower_gets_none_when_leader_dropped_without_complete() {
530        let sf = Arc::new(Singleflight::new());
531
532        let leader = match sf.try_join(42) {
533            FlightResult::Leader(l) => l,
534            _ => panic!("expected leader"),
535        };
536
537        let follower_state = match sf.try_join(42) {
538            FlightResult::Follower(s) => s,
539            _ => panic!("expected follower"),
540        };
541
542        let handle = std::thread::spawn(move || {
543            // This will block until leader notifies. Since leader drops without
544            // completing, the condvar is notified but result is None.
545            // However, our wait_for_result loops while None, so we need the
546            // leader drop to set something. The current impl wakes followers
547            // but leaves result as None. The wait_for_result will spin once
548            // more on the lock. We need to handle this edge case.
549            // For now, verify that the follower state is eventually dropped.
550            let _ = follower_state;
551        });
552
553        // Drop leader without completing (simulates thread panic).
554        drop(leader);
555
556        handle.join().unwrap();
557
558        // Key should be cleaned up
559        let result = sf.try_join(42);
560        assert!(
561            matches!(result, FlightResult::Leader(_)),
562            "key should be removed from map after leader drop"
563        );
564    }
565
566    // --- Audit: leader drop cleans up, new leader can succeed ---
567
568    #[test]
569    fn new_leader_succeeds_after_previous_leader_dropped() {
570        let sf = Arc::new(Singleflight::new());
571
572        // First leader drops without completing (simulates panic).
573        let leader1 = match sf.try_join(42) {
574            FlightResult::Leader(l) => l,
575            _ => panic!("expected leader"),
576        };
577        drop(leader1);
578
579        // A new try_join should produce a fresh leader (not a follower on a dead condvar).
580        let leader2 = match sf.try_join(42) {
581            FlightResult::Leader(l) => l,
582            _ => panic!("expected new leader after previous leader drop"),
583        };
584
585        let follower_state = match sf.try_join(42) {
586            FlightResult::Follower(s) => s,
587            _ => panic!("expected follower for second leader"),
588        };
589
590        let handle = std::thread::spawn(move || Singleflight::wait_for_result(&follower_state));
591
592        let err = BsqlError::from(bsql_driver_postgres::DriverError::Pool("retry".into()));
593        leader2.complete(&sf, Arc::new(Err(err)));
594
595        let received = handle.join().unwrap();
596        assert!(received.is_some());
597        assert!(received.unwrap().is_err());
598    }
599}