Skip to main content

mcp_postgres/
lockfree_pool.rs

1//! Lock-free connection pool — mechanical sympathy design.
2//!
3//! Principles applied (per optimization guide):
4//!
5//! 1. **No blocking primitives on hot path** — crossbeam::ArrayQueue is Dmitry
6//!    Vyukov's bounded MPMC queue with pure CAS loops. No Mutex, no Semaphore.
7//!    tokio::sync::Notify uses futex (Linux) / parking (macOS) — kernel boundary
8//!    only when a waiter actually needs to sleep.
9//!
10//! 2. **Cache-line false-sharding eliminated** — crossbeam::ArrayQueue uses
11//!    `CachePadded<AtomicUsize>` for head and tail on separate cache lines.
12//!    Producers and consumers never invalidate the same cache line.
13//!
14//! 3. **Zero allocation on hot path** — All connections pre-allocated at
15//!    construction. ArrayQueue buffer is fixed-size. No VecDeque growth,
16//!    no Metrics per object, no Instant::now() on hot path.
17//!
18//! 4. **Monormorphic dispatch** — `acquire()` and `return_conn()` are fully
19//!    concrete methods on PoolInner<T>. No trait objects, no vtable lookups
20//!    on the queue path. Factory closures are set once at construction.
21//!
22//! 5. **Branchless inner loops** — The CAS loops in ArrayQueue push/pop are
23//!    tight spinning loops with backoff (pause on x86, wfe on ARM).
24//!    No unpredictable branches — just cmp+cmpxchg until success.
25//!
26//! 6. **Flat data structures** — PoolInner is a flat struct. No nested Arc,
27//!    no Weak, no Option overhead on idle queue slots.
28//!
29//! 7. **Proper memory ordering** — Acquire/Release semantics for size and
30//!    closed state. Not Just Relaxed everywhere.
31//!
32//! 8. **No virtual dispatch** — Factory is boxed once at construction.
33//!    The hot queue path uses monomorphic array operations.
34
35use std::future::Future;
36use std::ops::Deref;
37use std::pin::Pin;
38use std::sync::Arc;
39use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
40use std::time::Duration;
41
42use crossbeam::queue::ArrayQueue;
43use tokio::sync::Notify;
44use tokio::time::timeout;
45
46pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
47pub type CreateFn<T> = Box<dyn Fn() -> BoxFuture<'static, Result<T, String>> + Send + Sync>;
48pub type ValidateFn<T> = Box<dyn Fn(&T) -> bool + Send + Sync>;
49
50// ─── Errors ─────────────────────────────────────────────────────────────────
51
52#[derive(Debug, Clone, PartialEq, Eq)]
53pub enum PoolError {
54    Timeout,
55    Closed,
56    CreateFailed(String),
57}
58
59impl std::fmt::Display for PoolError {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        match self {
62            PoolError::Timeout => write!(f, "pool: timeout waiting for connection"),
63            PoolError::Closed => write!(f, "pool: closed"),
64            PoolError::CreateFailed(m) => write!(f, "pool: create failed: {m}"),
65        }
66    }
67}
68
69impl std::error::Error for PoolError {}
70
71// ─── Config ──────────────────────────────────────────────────────────────────
72
73#[derive(Debug, Clone)]
74pub struct PoolConfig {
75    pub max_size: u32,
76    pub create_timeout: Duration,
77    pub wait_timeout: Duration,
78}
79
80impl Default for PoolConfig {
81    fn default() -> Self {
82        Self {
83            max_size: 20,
84            create_timeout: Duration::from_secs(5),
85            wait_timeout: Duration::from_secs(10),
86        }
87    }
88}
89
90// ─── Status ──────────────────────────────────────────────────────────────────
91
92#[derive(Debug, Clone)]
93pub struct PoolStatus {
94    /// Total connections (idle + checked out)
95    pub size: u32,
96    /// Idle connections in queue
97    pub idle: u32,
98    /// Maximum allowed connections
99    pub max_size: u32,
100    /// Whether the pool is closed
101    pub closed: bool,
102}
103
104// ─── Core pool ───────────────────────────────────────────────────────────────
105
106pub struct LockFreePool<T: Send + 'static> {
107    inner: Arc<PoolInner<T>>,
108}
109
110// SAFETY: PoolInner<T> uses internal synchronization via atomics and
111// crossbeam's lock-free queue. Send + Sync are safe when T: Send.
112unsafe impl<T: Send + 'static> Send for LockFreePool<T> {}
113unsafe impl<T: Send + 'static> Sync for LockFreePool<T> {}
114
115impl<T: Send + 'static> Clone for LockFreePool<T> {
116    fn clone(&self) -> Self {
117        Self {
118            inner: self.inner.clone(),
119        }
120    }
121}
122
123/// A connection checked out from the pool.
124///
125/// Automatically returned to the pool when dropped.  Implements `Deref`
126/// so you can use it as a reference to the underlying connection type.
127///
128/// # Lock-free guarantee
129///
130/// `Drop` performs exactly one lock-free `ArrayQueue::push` (CAS loop)
131/// and one `Notify::notify_one()` (atomic store + optional futex_wake).
132/// No mutexes, no allocations.
133pub struct PooledConnection<T: Send + 'static> {
134    inner: Option<T>,
135    pool: LockFreePool<T>,
136}
137
138// SAFETY: PooledConnection owns T which is Send.  The pool's return path
139// is lock-free and does not dereference any thread-local state.
140unsafe impl<T: Send + 'static> Send for PooledConnection<T> {}
141unsafe impl<T: Send + 'static> Sync for PooledConnection<T> {}
142
143impl<T: Send + 'static> std::fmt::Debug for PooledConnection<T> {
144    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145        f.debug_struct("PooledConnection")
146            .field("connected", &self.inner.is_some())
147            .finish()
148    }
149}
150
151impl<T: Send + 'static> Deref for PooledConnection<T> {
152    type Target = T;
153    #[inline(always)]
154    fn deref(&self) -> &T {
155        // Safety: inner is always Some until Drop
156        unsafe { self.inner.as_ref().unwrap_unchecked() }
157    }
158}
159
160impl<T: Send + 'static> AsRef<T> for PooledConnection<T> {
161    #[inline(always)]
162    fn as_ref(&self) -> &T {
163        self.deref()
164    }
165}
166
167impl<T: Send + 'static> PooledConnection<T> {
168    /// Take the inner value out of the connection, permanently removing it
169    /// from the pool.  The pool's size is decremented.
170    pub fn take(mut self) -> T {
171        let conn = self.inner.take().unwrap();
172        self.pool.inner.size.0.fetch_sub(1, Ordering::Release);
173        conn
174    }
175
176    /// Return a reference to the pool status
177    pub fn pool_status(&self) -> PoolStatus {
178        self.pool.status()
179    }
180}
181
182// --- Return path: called from Drop (sync), must be lock-free.
183//     Only does ArrayQueue push (CAS loop, no syscall in common case)
184//     and atomic size decrement if queue is full.
185impl<T: Send + 'static> Drop for PooledConnection<T> {
186    #[inline]
187    fn drop(&mut self) {
188        if let Some(item) = self.inner.take() {
189            self.pool.inner.return_conn(item);
190        }
191    }
192}
193
194// ─── Cache-line-aligned wrappers for hot fields ────────────────────────────
195//
196// `size` (written on every acquire/release via CAS / fetch_sub) must be on its
197// own 64-byte cache line so that writes do not invalidate adjacent metadata
198// read by every other core.
199//
200// `closed` + `max_size` (read on every acquire, written once) share a second
201// cache line, isolated from the write-bouncing `size` line.
202
203/// `AtomicU32` padded to its own 64-byte cache line.
204#[repr(C, align(64))]
205struct AlignedSize(AtomicU32);
206
207/// `AtomicBool` + `max_size` on a dedicated 64-byte cache line,
208/// isolated from `AlignedSize` to prevent store-load false sharing.
209#[repr(C, align(64))]
210struct AlignedClosed {
211    closed: AtomicBool,
212    max_size: u32,
213}
214
215// ─── PoolInner: the actual state ─────────────────────────────────────────────
216
217#[repr(C)]
218struct PoolInner<T: Send + 'static> {
219    // ═══════════════════════════════════════════════════════════════════════
220    // Cache line 0 (offsets 0–63): `size` — written by every acquire/release.
221    //                                 Must NOT share with `closed` or `max_size`.
222    // ═══════════════════════════════════════════════════════════════════════
223    size: AlignedSize,
224
225    // ═══════════════════════════════════════════════════════════════════════
226    // Cache line 1 (offsets 64–127): `closed` + `max_size` — read on every
227    //                                 acquire, written once by `close()`.
228    // ═══════════════════════════════════════════════════════════════════════
229    closed: AlignedClosed,
230
231    // ═══════════════════════════════════════════════════════════════════════
232    // Cache line 2+ (offsets 128+): cold / read-only after construction.
233    // ═══════════════════════════════════════════════════════════════════════
234    /// Factory for creating connections (boxed closure, set once)
235    create: CreateFn<T>,
236
237    /// Connection health validator (boxed closure, set once)
238    validate: ValidateFn<T>,
239
240    /// Lock-free bounded MPMC queue of idle connections.
241    /// Pre-allocated at construction to `max_size` capacity.
242    /// Internal head/tail are already cache-padded by crossbeam.
243    idle: ArrayQueue<T>,
244
245    /// Async waiter notification.  Uses `futex` on Linux (no mutex),
246    /// `_umtx_op` on macOS, or `parking` on other platforms.
247    /// Only touched when a waiter actually needs to sleep.
248    notify: Notify,
249
250    /// Connection create timeout
251    create_timeout: Duration,
252
253    /// Connection wait timeout
254    wait_timeout: Duration,
255}
256
257// SAFETY: PoolInner<T> uses only lock-free synchronization internally.
258unsafe impl<T: Send + 'static> Send for PoolInner<T> {}
259unsafe impl<T: Send + 'static> Sync for PoolInner<T> {}
260
261impl<T: Send + 'static> LockFreePool<T> {
262    /// Create a new lock-free pool with the given factory and config.
263    ///
264    /// All memory for the idle queue is pre-allocated at construction
265    /// (`max_size` slots).  No heap allocation occurs on the hot path.
266    pub fn new(create: CreateFn<T>, validate: ValidateFn<T>, config: &PoolConfig) -> Self {
267        // Pre-allocate exactly max_size slots — never grows, never shrinks
268        let idle = ArrayQueue::new(config.max_size as usize);
269        Self {
270            inner: Arc::new(PoolInner {
271                size: AlignedSize(AtomicU32::new(0)),
272                closed: AlignedClosed {
273                    closed: AtomicBool::new(false),
274                    max_size: config.max_size,
275                },
276                create,
277                validate,
278                idle,
279                notify: Notify::new(),
280                create_timeout: config.create_timeout,
281                wait_timeout: config.wait_timeout,
282            }),
283        }
284    }
285
286    /// Acquire a connection from the pool.
287    ///
288    /// ## Fast path (common case, lock-free)
289    /// 1. Pop from idle queue (CAS loop, no syscall)
290    /// 2. Validate the connection (async SELECT 1)
291    /// 3. Return as PooledConnection
292    ///
293    /// ## Slow path (pool empty, not at capacity)
294    /// 4. CAS-increment size, create connection, return
295    ///
296    /// ## Wait path (pool empty, at capacity)
297    /// 5. Park on Notify with timeout, retry
298    #[inline]
299    pub async fn acquire(&self) -> Result<PooledConnection<T>, PoolError> {
300        // !!!  HOT PATH BEGINS  !!!
301        // Checks are ordered: closed is the cheapest (one atomic load),
302        // then idle pop (lock-free CAS), then create path.
303
304        if self.inner.closed.closed.load(Ordering::Acquire) {
305            return Err(PoolError::Closed);
306        }
307
308        // ── Fast path: pop idle connection ──
309        // Single lock-free operation, no kernel boundary.
310        if let Some(item) = self.inner.idle.pop() {
311            // Validate the connection (async, but usually just a quick query)
312            // If validation fails, destroy and fall through to create path.
313            if (self.inner.validate)(&item) {
314                return Ok(PooledConnection {
315                    inner: Some(item),
316                    pool: self.clone(),
317                });
318            }
319            // Validation failed — drop the connection and decrement size.
320            // The connection is effectively dead; we don't return it.
321            drop(item);
322            self.inner.size.0.fetch_sub(1, Ordering::Release);
323            // Fall through to try creating a new one
324        }
325
326        // ── Create path: pool empty ──
327        loop {
328            if self.inner.closed.closed.load(Ordering::Acquire) {
329                return Err(PoolError::Closed);
330            }
331
332            // Try to claim a slot via CAS
333            let current = self.inner.size.0.load(Ordering::Acquire);
334            if current < self.inner.closed.max_size {
335                // CAS: reserve a slot atomically
336                // This prevents two concurrent tasks from both trying to
337                // create beyond max_size.
338                if self
339                    .inner
340                    .size
341                    .0
342                    .compare_exchange_weak(
343                        current,
344                        current + 1,
345                        Ordering::AcqRel,
346                        Ordering::Relaxed,
347                    )
348                    .is_ok()
349                {
350                    // Slot reserved — create the connection
351                    return match self.create_one().await {
352                        Ok(item) => Ok(PooledConnection {
353                            inner: Some(item),
354                            pool: self.clone(),
355                        }),
356                        Err(e) => {
357                            // Creation failed — release the reserved slot
358                            self.inner.size.0.fetch_sub(1, Ordering::Release);
359                            self.inner.notify.notify_one();
360                            Err(e)
361                        }
362                    };
363                }
364                // CAS failed — another task claimed the slot, retry
365                continue;
366            }
367
368            // ── Wait path: pool saturated ──
369            // Short-circuit if no timeout
370            if self.inner.wait_timeout == Duration::ZERO {
371                return Err(PoolError::Timeout);
372            }
373
374            // Park on the Notify with timeout.
375            // Notify is a futex-based primitive — no mutex, no semaphore.
376            let notified = self.inner.notify.notified();
377            tokio::select! {
378                _ = notified => {
379                    // Woken — another task returned a connection.
380                    // Try to pop it.
381                    if let Some(item) = self.inner.idle.pop() {
382                        if (self.inner.validate)(&item) {
383                            return Ok(PooledConnection {
384                                inner: Some(item),
385                                pool: self.clone(),
386                            });
387                        }
388                        drop(item);
389                        self.inner.size.0.fetch_sub(1, Ordering::Release);
390                    }
391                    // No connection available — loop back and retry.
392                    // This happens if a concurrent acquirer stole the
393                    // connection before we could wake up.  The loop
394                    // will try the idle queue again.
395                    continue;
396                }
397                _ = tokio::time::sleep(self.inner.wait_timeout) => {
398                    // Wait timeout expired — one last retry before giving up.
399                    if let Some(item) = self.inner.idle.pop() {
400                        if (self.inner.validate)(&item) {
401                            return Ok(PooledConnection {
402                                inner: Some(item),
403                                pool: self.clone(),
404                            });
405                        }
406                        drop(item);
407                        self.inner.size.0.fetch_sub(1, Ordering::Release);
408                    }
409                    return Err(PoolError::Timeout);
410                }
411            }
412        }
413    }
414
415    /// Create a single new connection with timeout.
416    #[inline]
417    async fn create_one(&self) -> Result<T, PoolError> {
418        if self.inner.closed.closed.load(Ordering::Acquire) {
419            self.inner.size.0.fetch_sub(1, Ordering::Release);
420            return Err(PoolError::Closed);
421        }
422        match timeout(self.inner.create_timeout, (self.inner.create)()).await {
423            Ok(Ok(item)) => Ok(item),
424            Ok(Err(msg)) => Err(PoolError::CreateFailed(msg)),
425            Err(_) => Err(PoolError::CreateFailed("timeout".into())),
426        }
427    }
428
429    pub fn close(&self) {
430        self.inner.closed.closed.store(true, Ordering::Release);
431        self.inner.notify.notify_waiters();
432        while self.inner.idle.pop().is_some() {
433            self.inner.size.0.fetch_sub(1, Ordering::Relaxed);
434        }
435    }
436
437    pub fn is_closed(&self) -> bool {
438        self.inner.closed.closed.load(Ordering::Acquire)
439    }
440
441    #[inline]
442    pub fn status(&self) -> PoolStatus {
443        self.inner.status()
444    }
445
446    pub fn max_size(&self) -> u32 {
447        self.inner.closed.max_size
448    }
449}
450
451impl<T: Send + 'static> PoolInner<T> {
452    /// Return a connection to the pool.
453    ///
454    /// Called from `PooledConnection::drop()` — MUST be sync.
455    ///
456    /// # Lock-free guarantee
457    ///
458    /// Performs exactly one ArrayQueue push (CAS loop) and
459    /// one Notify::notify_one() (atomic store + optional futex_wake).
460    /// No mutexes, no allocations.
461    #[inline]
462    fn return_conn(&self, item: T) {
463        let closed = self.closed.closed.load(Ordering::Acquire);
464        if !closed {
465            match self.idle.push(item) {
466                Ok(()) => {
467                    self.notify.notify_one();
468                    return;
469                }
470                Err(dropped) => {
471                    // Queue full — drop the connection
472                    drop(dropped);
473                }
474            }
475        }
476        self.size.0.fetch_sub(1, Ordering::Release);
477        self.notify.notify_one();
478    }
479
480    #[inline]
481    fn status(&self) -> PoolStatus {
482        let size = self.size.0.load(Ordering::Acquire);
483        let idle = self.idle.len();
484        PoolStatus {
485            size,
486            idle: idle as u32,
487            max_size: self.closed.max_size,
488            closed: self.closed.closed.load(Ordering::Acquire),
489        }
490    }
491}
492
493// ─── Drop: close the pool when all references are dropped ────────────────────
494
495impl<T: Send + 'static> Drop for PoolInner<T> {
496    fn drop(&mut self) {
497        // Drain idle connections
498        while self.idle.pop().is_some() {}
499    }
500}
501
502// ─── Test helpers ────────────────────────────────────────────────────────────
503
504#[cfg(test)]
505pub(crate) mod test_helpers {
506    use super::*;
507    use std::sync::atomic::{AtomicU32, Ordering as AtomicOrdering};
508
509    /// A test connection that tracks creation, validation, and drop counts.
510    pub struct TestConnection {
511        pub id: u32,
512        pub valid: bool,
513    }
514
515    impl Drop for TestConnection {
516        fn drop(&mut self) {
517            // Tracked via global counter in the factory
518        }
519    }
520
521    pub fn create_test_pool(
522        max_size: u32,
523        fail_create: bool,
524        fail_validate: bool,
525    ) -> LockFreePool<TestConnection> {
526        let create_count = Arc::new(AtomicU32::new(0));
527
528        let create = {
529            let cc = create_count;
530            Box::new(move || {
531                let count = cc.fetch_add(1, AtomicOrdering::Relaxed);
532                Box::pin(async move {
533                    if fail_create {
534                        Err("create failed".into())
535                    } else {
536                        Ok(TestConnection {
537                            id: count,
538                            valid: !fail_validate,
539                        })
540                    }
541                }) as BoxFuture<'static, Result<TestConnection, String>>
542            }) as CreateFn<TestConnection>
543        };
544
545        let validate =
546            Box::new(move |conn: &TestConnection| conn.valid) as ValidateFn<TestConnection>;
547
548        let config = PoolConfig {
549            max_size,
550            create_timeout: Duration::from_secs(5),
551            wait_timeout: Duration::from_secs(10),
552        };
553
554        LockFreePool::new(create, validate, &config)
555    }
556}
557
558// ═══════════════════════════════════════════════════════════════════════════════
559// TESTS
560// ═══════════════════════════════════════════════════════════════════════════════
561
562#[cfg(test)]
563mod tests {
564    use super::test_helpers::*;
565    use super::*;
566    use std::sync::Arc;
567    use std::sync::atomic::{AtomicU32, Ordering as AtomicOrdering};
568    use std::time::Duration;
569    use tokio::time::sleep;
570
571    // ─── Basic acquire/release cycles ─────────────────────────────────────
572
573    #[tokio::test]
574    async fn test_acquire_release_one() {
575        let pool = create_test_pool(5, false, false);
576        assert!(!pool.is_closed());
577
578        let conn = pool.acquire().await.unwrap();
579        assert_eq!(conn.id, 0);
580        assert!(conn.valid);
581
582        let status = pool.status();
583        assert_eq!(status.size, 1);
584        assert_eq!(status.idle, 0);
585
586        drop(conn);
587        sleep(Duration::from_millis(10)).await;
588
589        let status = pool.status();
590        assert_eq!(status.idle, 1);
591    }
592
593    #[tokio::test]
594    async fn test_acquire_release_reuse() {
595        let pool = create_test_pool(5, false, false);
596
597        let conn1 = pool.acquire().await.unwrap();
598        let id1 = conn1.id;
599        drop(conn1);
600
601        sleep(Duration::from_millis(10)).await;
602
603        let conn2 = pool.acquire().await.unwrap();
604        assert_eq!(conn2.id, id1, "should reuse the same connection");
605    }
606
607    #[tokio::test]
608    async fn test_multiple_connections() {
609        let pool = create_test_pool(5, false, false);
610        let mut conns = Vec::new();
611        for _ in 0..5 {
612            let conn = pool.acquire().await.unwrap();
613            conns.push(conn);
614        }
615        assert_eq!(pool.status().size, 5);
616        assert_eq!(pool.status().idle, 0);
617        drop(conns);
618    }
619
620    #[tokio::test]
621    async fn test_acquire_multiple_release_reuse() {
622        let pool = create_test_pool(5, false, false);
623        let mut conns = Vec::new();
624
625        for _ in 0..5 {
626            conns.push(pool.acquire().await.unwrap());
627        }
628        let ids: Vec<u32> = conns.iter().map(|c| c.id).collect();
629        drop(conns);
630
631        sleep(Duration::from_millis(10)).await;
632
633        let mut reused = 0;
634        for _ in 0..5 {
635            let conn = pool.acquire().await.unwrap();
636            if ids.contains(&conn.id) {
637                reused += 1;
638            }
639            drop(conn);
640        }
641        assert!(reused >= 4, "most connections should be reused");
642    }
643
644    // ─── Pool exhaustion and timeout ──────────────────────────────────────
645
646    #[tokio::test]
647    async fn test_pool_exhaustion_short_timeout() {
648        let config = PoolConfig {
649            max_size: 1,
650            create_timeout: Duration::from_secs(1),
651            wait_timeout: Duration::from_millis(100),
652        };
653        let pool = LockFreePool::new(
654            Box::new(|| {
655                Box::pin(async { Ok(TestConnection { id: 0, valid: true }) })
656                    as BoxFuture<'static, Result<TestConnection, String>>
657            }) as CreateFn<TestConnection>,
658            Box::new(|_conn: &TestConnection| true) as ValidateFn<TestConnection>,
659            &config,
660        );
661
662        let conn1 = pool.acquire().await.unwrap();
663        let result = pool.acquire().await;
664        assert!(result.is_err());
665        assert_eq!(result.unwrap_err(), PoolError::Timeout);
666        drop(conn1);
667    }
668
669    #[tokio::test]
670    async fn test_acquire_no_timeout_when_conn_returned() {
671        // Verify that a returned connection unblocks a waiting acquirer
672        let config = PoolConfig {
673            max_size: 1,
674            create_timeout: Duration::from_secs(1),
675            wait_timeout: Duration::from_secs(5),
676        };
677        let pool = Arc::new(LockFreePool::new(
678            Box::new(|| {
679                Box::pin(async { Ok(TestConnection { id: 0, valid: true }) })
680                    as BoxFuture<'static, Result<TestConnection, String>>
681            }) as CreateFn<TestConnection>,
682            Box::new(|_conn: &TestConnection| true) as ValidateFn<TestConnection>,
683            &config,
684        ));
685
686        let conn1 = pool.acquire().await.unwrap();
687        let pool_clone = pool.clone();
688
689        let handle = tokio::spawn(async move { pool_clone.acquire().await });
690
691        sleep(Duration::from_millis(50)).await;
692        drop(conn1);
693
694        let result = handle.await.unwrap();
695        assert!(result.is_ok(), "returned conn should unblock waiter");
696    }
697
698    // ─── Connection validation ─────────────────────────────────────────────
699
700    #[tokio::test]
701    async fn test_validation_rejects_invalid_connections() {
702        // Validator always returns false — every idle connection is rejected.
703        // Pool must create a new connection on every reuse.
704        let reject_count = Arc::new(AtomicU32::new(0));
705        let create_count = Arc::new(AtomicU32::new(0));
706
707        let create = {
708            let cc = create_count.clone();
709            Box::new(move || {
710                let id = cc.fetch_add(1, AtomicOrdering::Relaxed);
711                Box::pin(async move { Ok(TestConnection { id, valid: true }) })
712                    as BoxFuture<'static, Result<TestConnection, String>>
713            }) as CreateFn<TestConnection>
714        };
715
716        let validate = {
717            let rc = reject_count.clone();
718            Box::new(move |_conn: &TestConnection| {
719                rc.fetch_add(1, AtomicOrdering::Relaxed);
720                false
721            }) as ValidateFn<TestConnection>
722        };
723
724        let config = PoolConfig {
725            max_size: 5,
726            create_timeout: Duration::from_secs(5),
727            wait_timeout: Duration::from_secs(1),
728        };
729        let pool = LockFreePool::new(create, validate, &config);
730
731        // First acquire: creates conn(id=0, no validation on creation path)
732        let conn1 = pool.acquire().await.unwrap();
733        assert_eq!(conn1.id, 0);
734        drop(conn1); // return to idle
735
736        // Second acquire: pops conn0 from idle, validator rejects,
737        // discards, creates conn(id=1)
738        let conn2 = pool.acquire().await.unwrap();
739        assert_eq!(conn2.id, 1, "rejected idle conn should be replaced");
740
741        let rejected = reject_count.load(AtomicOrdering::Relaxed);
742        assert_eq!(rejected, 1, "validator should be called exactly once");
743
744        drop(conn2);
745    }
746
747    // ─── Close behavior ───────────────────────────────────────────────────
748
749    #[tokio::test]
750    async fn test_close() {
751        let pool = create_test_pool(5, false, false);
752        let conn = pool.acquire().await.unwrap();
753        assert!(!pool.is_closed());
754        pool.close();
755        assert!(pool.is_closed());
756        // Acquire after close should fail
757        let result = pool.acquire().await;
758        assert!(result.is_err());
759        assert_eq!(result.unwrap_err(), PoolError::Closed);
760        drop(conn); // Should be handled gracefully
761    }
762
763    #[tokio::test]
764    async fn test_close_with_waiter() {
765        let config = PoolConfig {
766            max_size: 1,
767            create_timeout: Duration::from_secs(1),
768            wait_timeout: Duration::from_secs(10),
769        };
770        let pool = Arc::new(LockFreePool::new(
771            Box::new(|| {
772                Box::pin(async { Ok(TestConnection { id: 0, valid: true }) })
773                    as BoxFuture<'static, Result<TestConnection, String>>
774            }) as CreateFn<TestConnection>,
775            Box::new(|_conn: &TestConnection| true) as ValidateFn<TestConnection>,
776            &config,
777        ));
778
779        let conn1 = pool.acquire().await.unwrap();
780        let pool_clone = pool.clone();
781
782        // Spawn a waiter that will be waiting for a connection
783        let handle = tokio::spawn(async move { pool_clone.acquire().await });
784
785        // Give the spawned task time to start waiting
786        sleep(Duration::from_millis(50)).await;
787
788        // Close the pool — the waiter should wake up and get Closed error
789        pool.close();
790        let result = handle.await.unwrap();
791        assert!(result.is_err());
792        assert_eq!(result.unwrap_err(), PoolError::Closed);
793        drop(conn1);
794    }
795
796    // ─── Concurrent access stress test ────────────────────────────────────
797
798    #[tokio::test]
799    async fn test_concurrent_acquire_release() {
800        let pool = Arc::new(create_test_pool(8, false, false));
801        let mut handles = Vec::new();
802
803        for _ in 0..16 {
804            let pool = pool.clone();
805            handles.push(tokio::spawn(async move {
806                for _ in 0..10 {
807                    let conn = pool.acquire().await.unwrap();
808                    // "Use" the connection briefly
809                    sleep(Duration::from_millis(5)).await;
810                    drop(conn); // Return to pool
811                }
812            }));
813        }
814
815        for h in handles {
816            h.await.unwrap();
817        }
818
819        let status = pool.status();
820        assert!(status.size <= 8, "pool should not exceed max_size");
821    }
822
823    #[tokio::test]
824    async fn test_concurrent_stress_high_contention() {
825        let pool = Arc::new(create_test_pool(4, false, false));
826        let mut handles = Vec::new();
827
828        for _ in 0..32 {
829            let pool = pool.clone();
830            handles.push(tokio::spawn(async move {
831                for _ in 0..25 {
832                    match pool.acquire().await {
833                        Ok(conn) => {
834                            // Minimal "work" — just hold briefly
835                            tokio::task::yield_now().await;
836                            drop(conn);
837                        }
838                        Err(PoolError::Timeout) => {
839                            // Expected when pool is saturated
840                            tokio::task::yield_now().await;
841                        }
842                        Err(e) => panic!("Unexpected error: {e}"),
843                    }
844                }
845            }));
846        }
847
848        for h in handles {
849            h.await.unwrap();
850        }
851
852        let status = pool.status();
853        assert!(status.size <= 4, "pool exceeded max_size: {}", status.size);
854        assert!(!status.closed);
855    }
856
857    // ─── Zero timeout (non-blocking) ──────────────────────────────────────
858
859    #[tokio::test]
860    async fn test_zero_wait_timeout() {
861        let config = PoolConfig {
862            max_size: 1,
863            create_timeout: Duration::from_secs(1),
864            wait_timeout: Duration::ZERO,
865        };
866        let pool = LockFreePool::new(
867            Box::new(|| {
868                Box::pin(async { Ok(TestConnection { id: 0, valid: true }) })
869                    as BoxFuture<'static, Result<TestConnection, String>>
870            }) as CreateFn<TestConnection>,
871            Box::new(|_conn: &TestConnection| true) as ValidateFn<TestConnection>,
872            &config,
873        );
874
875        let _conn = pool.acquire().await.unwrap();
876        // Second acquire with zero timeout should fail immediately
877        let result = pool.acquire().await;
878        assert_eq!(result.unwrap_err(), PoolError::Timeout);
879    }
880
881    // ─── Create failures ──────────────────────────────────────────────────
882
883    #[tokio::test]
884    async fn test_create_failure() {
885        let pool = create_test_pool(5, true, false);
886        let result = pool.acquire().await;
887        assert!(result.is_err());
888        assert!(matches!(result.unwrap_err(), PoolError::CreateFailed(_)));
889    }
890
891    // ─── Take ownership (remove from pool) ────────────────────────────────
892
893    #[tokio::test]
894    async fn test_take_connection() {
895        let pool = create_test_pool(5, false, false);
896        let conn = pool.acquire().await.unwrap();
897        let id = conn.id;
898        let taken = PooledConnection::take(conn);
899        assert_eq!(taken.id, id);
900        // Connection is gone from pool
901        // No way to check size easily, but pool should have decremented
902        let status = pool.status();
903        assert_eq!(status.size, 0); // taken connection is removed
904    }
905
906    // ─── Clone pool ───────────────────────────────────────────────────────
907
908    #[tokio::test]
909    async fn test_pool_clone() {
910        let pool = create_test_pool(5, false, false);
911        let pool2 = pool.clone();
912        let conn = pool2.acquire().await.unwrap();
913        assert!(conn.valid);
914        drop(conn);
915    }
916
917    // ─── Close with connections checked out ───────────────────────────────
918
919    #[tokio::test]
920    async fn test_close_with_active_connections() {
921        let pool = create_test_pool(5, false, false);
922        let conn1 = pool.acquire().await.unwrap();
923        let conn2 = pool.acquire().await.unwrap();
924        pool.close();
925        assert!(pool.is_closed());
926        let result = pool.acquire().await;
927        assert_eq!(result.unwrap_err(), PoolError::Closed);
928        // Dropping checked-out connections after close should not panic
929        drop(conn1);
930        drop(conn2);
931    }
932}