Skip to main content

mcp_postgres/
lockfree_pool.rs

1//! Lock-free connection pool — mechanical sympathy design.
2//!
3//! Principles applied (per optimization guide):
4//!
5//! 1. **No blocking primitives on hot path** — crossbeam::ArrayQueue is Dmitry
6//!    Vyukov's bounded MPMC queue with pure CAS loops. No Mutex, no Semaphore.
7//!    tokio::sync::Notify uses futex (Linux) / parking (macOS) — kernel boundary
8//!    only when a waiter actually needs to sleep.
9//!
10//! 2. **Cache-line false-sharding eliminated** — crossbeam::ArrayQueue uses
11//!    `CachePadded<AtomicUsize>` for head and tail on separate cache lines.
12//!    Producers and consumers never invalidate the same cache line.
13//!
14//! 3. **Zero allocation on hot path** — All connections pre-allocated at
15//!    construction. ArrayQueue buffer is fixed-size. No VecDeque growth,
16//!    no Metrics per object, no Instant::now() on hot path.
17//!
18//! 4. **Monormorphic dispatch** — `acquire()` and `return_conn()` are fully
19//!    concrete methods on PoolInner<T>. No trait objects, no vtable lookups
20//!    on the queue path. Factory closures are set once at construction.
21//!
22//! 5. **Branchless inner loops** — The CAS loops in ArrayQueue push/pop are
23//!    tight spinning loops with backoff (pause on x86, wfe on ARM).
24//!    No unpredictable branches — just cmp+cmpxchg until success.
25//!
26//! 6. **Flat data structures** — PoolInner is a flat struct. No nested Arc,
27//!    no Weak, no Option overhead on idle queue slots.
28//!
29//! 7. **Proper memory ordering** — Acquire/Release semantics for size and
30//!    closed state. Not Just Relaxed everywhere.
31//!
32//! 8. **No virtual dispatch** — Factory is boxed once at construction.
33//!    The hot queue path uses monomorphic array operations.
34
35use std::future::Future;
36use std::ops::Deref;
37use std::pin::Pin;
38use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
39use std::sync::Arc;
40use std::time::Duration;
41
42use crossbeam::queue::ArrayQueue;
43use tokio::sync::Notify;
44use tokio::time::timeout;
45
46pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
47pub type CreateFn<T> = Box<dyn Fn() -> BoxFuture<'static, Result<T, String>> + Send + Sync>;
48pub type ValidateFn<T> = Box<dyn Fn(&T) -> bool + Send + Sync>;
49
50// ─── Errors ─────────────────────────────────────────────────────────────────
51
52#[derive(Debug, Clone, PartialEq, Eq)]
53pub enum PoolError {
54    Timeout,
55    Closed,
56    CreateFailed(String),
57}
58
59impl std::fmt::Display for PoolError {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        match self {
62            PoolError::Timeout => write!(f, "pool: timeout waiting for connection"),
63            PoolError::Closed => write!(f, "pool: closed"),
64            PoolError::CreateFailed(m) => write!(f, "pool: create failed: {m}"),
65        }
66    }
67}
68
69impl std::error::Error for PoolError {}
70
71// ─── Config ──────────────────────────────────────────────────────────────────
72
73#[derive(Debug, Clone)]
74pub struct PoolConfig {
75    pub max_size: u32,
76    pub create_timeout: Duration,
77    pub wait_timeout: Duration,
78}
79
80impl Default for PoolConfig {
81    fn default() -> Self {
82        Self {
83            max_size: 20,
84            create_timeout: Duration::from_secs(5),
85            wait_timeout: Duration::from_secs(10),
86        }
87    }
88}
89
90// ─── Status ──────────────────────────────────────────────────────────────────
91
92#[derive(Debug, Clone)]
93pub struct PoolStatus {
94    /// Total connections (idle + checked out)
95    pub size: u32,
96    /// Idle connections in queue
97    pub idle: u32,
98    /// Maximum allowed connections
99    pub max_size: u32,
100    /// Whether the pool is closed
101    pub closed: bool,
102}
103
104// ─── Core pool ───────────────────────────────────────────────────────────────
105
106pub struct LockFreePool<T: Send + 'static> {
107    inner: Arc<PoolInner<T>>,
108}
109
110// SAFETY: PoolInner<T> uses internal synchronization via atomics and
111// crossbeam's lock-free queue. Send + Sync are safe when T: Send.
112unsafe impl<T: Send + 'static> Send for LockFreePool<T> {}
113unsafe impl<T: Send + 'static> Sync for LockFreePool<T> {}
114
115impl<T: Send + 'static> Clone for LockFreePool<T> {
116    fn clone(&self) -> Self {
117        Self {
118            inner: self.inner.clone(),
119        }
120    }
121}
122
123/// A connection checked out from the pool.
124///
125/// Automatically returned to the pool when dropped.  Implements `Deref`
126/// so you can use it as a reference to the underlying connection type.
127///
128/// # Lock-free guarantee
129///
130/// `Drop` performs exactly one lock-free `ArrayQueue::push` (CAS loop)
131/// and one `Notify::notify_one()` (atomic store + optional futex_wake).
132/// No mutexes, no allocations.
133pub struct PooledConnection<T: Send + 'static> {
134    inner: Option<T>,
135    pool: LockFreePool<T>,
136}
137
138// SAFETY: PooledConnection owns T which is Send.  The pool's return path
139// is lock-free and does not dereference any thread-local state.
140unsafe impl<T: Send + 'static> Send for PooledConnection<T> {}
141unsafe impl<T: Send + 'static> Sync for PooledConnection<T> {}
142
143impl<T: Send + 'static> std::fmt::Debug for PooledConnection<T> {
144    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145        f.debug_struct("PooledConnection")
146            .field("connected", &self.inner.is_some())
147            .finish()
148    }
149}
150
151impl<T: Send + 'static> Deref for PooledConnection<T> {
152    type Target = T;
153    #[inline(always)]
154    fn deref(&self) -> &T {
155        // Safety: inner is always Some until Drop
156        unsafe { self.inner.as_ref().unwrap_unchecked() }
157    }
158}
159
160impl<T: Send + 'static> AsRef<T> for PooledConnection<T> {
161    #[inline(always)]
162    fn as_ref(&self) -> &T {
163        self.deref()
164    }
165}
166
167impl<T: Send + 'static> PooledConnection<T> {
168    /// Take the inner value out of the connection, permanently removing it
169    /// from the pool.  The pool's size is decremented.
170    pub fn take(mut self) -> T {
171        let conn = self.inner.take().unwrap();
172        self.pool.inner.size.fetch_sub(1, Ordering::Release);
173        conn
174    }
175
176    /// Return a reference to the pool status
177    pub fn pool_status(&self) -> PoolStatus {
178        self.pool.status()
179    }
180}
181
182// --- Return path: called from Drop (sync), must be lock-free.
183//     Only does ArrayQueue push (CAS loop, no syscall in common case)
184//     and atomic size decrement if queue is full.
185impl<T: Send + 'static> Drop for PooledConnection<T> {
186    #[inline]
187    fn drop(&mut self) {
188        if let Some(item) = self.inner.take() {
189            self.pool.inner.return_conn(item);
190        }
191    }
192}
193
194// ─── PoolInner: the actual state ─────────────────────────────────────────────
195
196struct PoolInner<T: Send + 'static> {
197    /// Factory for creating connections (boxed closure, set once)
198    create: CreateFn<T>,
199
200    /// Connection health validator (boxed closure, set once)
201    validate: ValidateFn<T>,
202
203    /// Lock-free bounded MPMC queue of idle connections.
204    /// Pre-allocated at construction to `max_size` capacity.
205    idle: ArrayQueue<T>,
206
207    /// Current pool size (idle + checked out).
208    /// Modified only via CAS (AcqRel semantics).
209    size: AtomicU32,
210
211    /// Maximum connections.  Immutable after construction.
212    max_size: u32,
213
214    /// Closed flag.  Set once with Release, read with Acquire.
215    closed: AtomicBool,
216
217    /// Async waiter notification.  Uses `futex` on Linux (no mutex),
218    /// `_umtx_op` on macOS, or `parking` on other platforms.
219    /// Only touched when a waiter actually needs to sleep.
220    notify: Notify,
221
222    /// Connection create timeout
223    create_timeout: Duration,
224
225    /// Connection wait timeout
226    wait_timeout: Duration,
227}
228
229// SAFETY: PoolInner<T> uses only lock-free synchronization internally.
230unsafe impl<T: Send + 'static> Send for PoolInner<T> {}
231unsafe impl<T: Send + 'static> Sync for PoolInner<T> {}
232
233impl<T: Send + 'static> LockFreePool<T> {
234    /// Create a new lock-free pool with the given factory and config.
235    ///
236    /// All memory for the idle queue is pre-allocated at construction
237    /// (`max_size` slots).  No heap allocation occurs on the hot path.
238    pub fn new(
239        create: CreateFn<T>,
240        validate: ValidateFn<T>,
241        config: PoolConfig,
242    ) -> Self {
243        // Pre-allocate exactly max_size slots — never grows, never shrinks
244        let idle = ArrayQueue::new(config.max_size as usize);
245        Self {
246            inner: Arc::new(PoolInner {
247                create,
248                validate,
249                idle,
250                size: AtomicU32::new(0),
251                max_size: config.max_size,
252                closed: AtomicBool::new(false),
253                notify: Notify::new(),
254                create_timeout: config.create_timeout,
255                wait_timeout: config.wait_timeout,
256            }),
257        }
258    }
259
260    /// Acquire a connection from the pool.
261    ///
262    /// ## Fast path (common case, lock-free)
263    /// 1. Pop from idle queue (CAS loop, no syscall)
264    /// 2. Validate the connection (async SELECT 1)
265    /// 3. Return as PooledConnection
266    ///
267    /// ## Slow path (pool empty, not at capacity)
268    /// 4. CAS-increment size, create connection, return
269    ///
270    /// ## Wait path (pool empty, at capacity)
271    /// 5. Park on Notify with timeout, retry
272    #[inline]
273    pub async fn acquire(&self) -> Result<PooledConnection<T>, PoolError> {
274        // !!!  HOT PATH BEGINS  !!!
275        // Checks are ordered: closed is the cheapest (one atomic load),
276        // then idle pop (lock-free CAS), then create path.
277
278        if self.inner.closed.load(Ordering::Acquire) {
279            return Err(PoolError::Closed);
280        }
281
282        // ── Fast path: pop idle connection ──
283        // Single lock-free operation, no kernel boundary.
284        if let Some(item) = self.inner.idle.pop() {
285            // Validate the connection (async, but usually just a quick query)
286            // If validation fails, destroy and fall through to create path.
287            if (self.inner.validate)(&item) {
288                return Ok(PooledConnection {
289                    inner: Some(item),
290                    pool: self.clone(),
291                });
292            }
293            // Validation failed — drop the connection and decrement size.
294            // The connection is effectively dead; we don't return it.
295            drop(item);
296            self.inner.size.fetch_sub(1, Ordering::Release);
297            // Fall through to try creating a new one
298        }
299
300        // ── Create path: pool empty ──
301        loop {
302            if self.inner.closed.load(Ordering::Acquire) {
303                return Err(PoolError::Closed);
304            }
305
306            // Try to claim a slot via CAS
307            let current = self.inner.size.load(Ordering::Acquire);
308            if current < self.inner.max_size {
309                // CAS: reserve a slot atomically
310                // This prevents two concurrent tasks from both trying to
311                // create beyond max_size.
312                if self.inner.size.compare_exchange_weak(
313                    current,
314                    current + 1,
315                    Ordering::AcqRel,
316                    Ordering::Relaxed,
317                ).is_ok() {
318                    // Slot reserved — create the connection
319                    return match self.create_one().await {
320                        Ok(item) => Ok(PooledConnection {
321                            inner: Some(item),
322                            pool: self.clone(),
323                        }),
324                        Err(e) => {
325                            // Creation failed — release the reserved slot
326                            self.inner.size.fetch_sub(1, Ordering::Release);
327                            self.inner.notify.notify_one();
328                            Err(e)
329                        }
330                    };
331                }
332                // CAS failed — another task claimed the slot, retry
333                continue;
334            }
335
336            // ── Wait path: pool saturated ──
337            // Short-circuit if no timeout
338            if self.inner.wait_timeout == Duration::ZERO {
339                return Err(PoolError::Timeout);
340            }
341
342            // Park on the Notify with timeout.
343            // Notify is a futex-based primitive — no mutex, no semaphore.
344            let notified = self.inner.notify.notified();
345            tokio::select! {
346                _ = notified => {
347                    // Woken — another task returned a connection.
348                    // Try to pop it.
349                    if let Some(item) = self.inner.idle.pop() {
350                        if (self.inner.validate)(&item) {
351                            return Ok(PooledConnection {
352                                inner: Some(item),
353                                pool: self.clone(),
354                            });
355                        }
356                        drop(item);
357                        self.inner.size.fetch_sub(1, Ordering::Release);
358                    }
359                    // No connection available — loop back and retry.
360                    // This happens if a concurrent acquirer stole the
361                    // connection before we could wake up.  The loop
362                    // will try the idle queue again.
363                    continue;
364                }
365                _ = tokio::time::sleep(self.inner.wait_timeout) => {
366                    // Wait timeout expired — one last retry before giving up.
367                    if let Some(item) = self.inner.idle.pop() {
368                        if (self.inner.validate)(&item) {
369                            return Ok(PooledConnection {
370                                inner: Some(item),
371                                pool: self.clone(),
372                            });
373                        }
374                        drop(item);
375                        self.inner.size.fetch_sub(1, Ordering::Release);
376                    }
377                    return Err(PoolError::Timeout);
378                }
379            }
380        }
381    }
382
383    /// Create a single new connection with timeout.
384    #[inline]
385    async fn create_one(&self) -> Result<T, PoolError> {
386        if self.inner.closed.load(Ordering::Acquire) {
387            self.inner.size.fetch_sub(1, Ordering::Release);
388            return Err(PoolError::Closed);
389        }
390        match timeout(self.inner.create_timeout, (self.inner.create)()).await {
391            Ok(Ok(item)) => Ok(item),
392            Ok(Err(msg)) => Err(PoolError::CreateFailed(msg)),
393            Err(_) => Err(PoolError::CreateFailed("timeout".into())),
394        }
395    }
396
397    pub fn close(&self) {
398        self.inner.closed.store(true, Ordering::Release);
399        self.inner.notify.notify_waiters();
400        while self.inner.idle.pop().is_some() {
401            self.inner.size.fetch_sub(1, Ordering::Relaxed);
402        }
403    }
404
405    pub fn is_closed(&self) -> bool {
406        self.inner.closed.load(Ordering::Acquire)
407    }
408
409    #[inline]
410    pub fn status(&self) -> PoolStatus {
411        self.inner.status()
412    }
413
414    pub fn max_size(&self) -> u32 {
415        self.inner.max_size
416    }
417}
418
419impl<T: Send + 'static> PoolInner<T> {
420    /// Return a connection to the pool.
421    ///
422    /// Called from `PooledConnection::drop()` — MUST be sync.
423    ///
424    /// # Lock-free guarantee
425    ///
426    /// Performs exactly one ArrayQueue push (CAS loop) and
427    /// one Notify::notify_one() (atomic store + optional futex_wake).
428    /// No mutexes, no allocations.
429    #[inline]
430    fn return_conn(&self, item: T) {
431        let closed = self.closed.load(Ordering::Acquire);
432        if !closed {
433            match self.idle.push(item) {
434                Ok(()) => {
435                    self.notify.notify_one();
436                    return;
437                }
438                Err(dropped) => {
439                    // Queue full — drop the connection
440                    drop(dropped);
441                }
442            }
443        }
444        self.size.fetch_sub(1, Ordering::Release);
445        self.notify.notify_one();
446    }
447
448    #[inline]
449    fn status(&self) -> PoolStatus {
450        let size = self.size.load(Ordering::Acquire);
451        let idle = self.idle.len();
452        PoolStatus {
453            size,
454            idle: idle as u32,
455            max_size: self.max_size,
456            closed: self.closed.load(Ordering::Acquire),
457        }
458    }
459}
460
461// ─── Drop: close the pool when all references are dropped ────────────────────
462
463impl<T: Send + 'static> Drop for PoolInner<T> {
464    fn drop(&mut self) {
465        // Drain idle connections
466        while self.idle.pop().is_some() {}
467    }
468}
469
470// ─── Test helpers ────────────────────────────────────────────────────────────
471
472#[cfg(test)]
473pub(crate) mod test_helpers {
474    use super::*;
475    use std::sync::atomic::{AtomicU32, Ordering as AtomicOrdering};
476
477    /// A test connection that tracks creation, validation, and drop counts.
478    pub struct TestConnection {
479        pub id: u32,
480        pub valid: bool,
481    }
482
483    impl Drop for TestConnection {
484        fn drop(&mut self) {
485            // Tracked via global counter in the factory
486        }
487    }
488
489    pub fn create_test_pool(
490        max_size: u32,
491        fail_create: bool,
492        fail_validate: bool,
493    ) -> LockFreePool<TestConnection> {
494        let create_count = Arc::new(AtomicU32::new(0));
495
496        let create = {
497            let cc = create_count.clone();
498            Box::new(move || {
499                let count = cc.fetch_add(1, AtomicOrdering::Relaxed);
500                Box::pin(async move {
501                    if fail_create {
502                        Err("create failed".into())
503                    } else {
504                        Ok(TestConnection {
505                            id: count,
506                            valid: !fail_validate,
507                        })
508                    }
509                }) as BoxFuture<'static, Result<TestConnection, String>>
510            }) as CreateFn<TestConnection>
511        };
512
513        let validate = Box::new(move |conn: &TestConnection| conn.valid) as ValidateFn<TestConnection>;
514
515        let config = PoolConfig {
516            max_size,
517            create_timeout: Duration::from_secs(5),
518            wait_timeout: Duration::from_secs(10),
519        };
520
521        LockFreePool::new(create, validate, config)
522    }
523}
524
525// ═══════════════════════════════════════════════════════════════════════════════
526// TESTS
527// ═══════════════════════════════════════════════════════════════════════════════
528
529#[cfg(test)]
530mod tests {
531    use super::test_helpers::*;
532    use super::*;
533    use std::sync::atomic::{AtomicU32, Ordering as AtomicOrdering};
534    use std::sync::Arc;
535    use std::time::Duration;
536    use tokio::time::sleep;
537
538    // ─── Basic acquire/release cycles ─────────────────────────────────────
539
540    #[tokio::test]
541    async fn test_acquire_release_one() {
542        let pool = create_test_pool(5, false, false);
543        assert!(!pool.is_closed());
544
545        let conn = pool.acquire().await.unwrap();
546        assert_eq!(conn.id, 0);
547        assert!(conn.valid);
548
549        let status = pool.status();
550        assert_eq!(status.size, 1);
551        assert_eq!(status.idle, 0);
552
553        drop(conn);
554        sleep(Duration::from_millis(10)).await;
555
556        let status = pool.status();
557        assert_eq!(status.idle, 1);
558    }
559
560    #[tokio::test]
561    async fn test_acquire_release_reuse() {
562        let pool = create_test_pool(5, false, false);
563
564        let conn1 = pool.acquire().await.unwrap();
565        let id1 = conn1.id;
566        drop(conn1);
567
568        sleep(Duration::from_millis(10)).await;
569
570        let conn2 = pool.acquire().await.unwrap();
571        assert_eq!(conn2.id, id1, "should reuse the same connection");
572    }
573
574    #[tokio::test]
575    async fn test_multiple_connections() {
576        let pool = create_test_pool(5, false, false);
577        let mut conns = Vec::new();
578        for _ in 0..5 {
579            let conn = pool.acquire().await.unwrap();
580            conns.push(conn);
581        }
582        assert_eq!(pool.status().size, 5);
583        assert_eq!(pool.status().idle, 0);
584        drop(conns);
585    }
586
587    #[tokio::test]
588    async fn test_acquire_multiple_release_reuse() {
589        let pool = create_test_pool(5, false, false);
590        let mut conns = Vec::new();
591
592        for _ in 0..5 {
593            conns.push(pool.acquire().await.unwrap());
594        }
595        let ids: Vec<u32> = conns.iter().map(|c| c.id).collect();
596        drop(conns);
597
598        sleep(Duration::from_millis(10)).await;
599
600        let mut reused = 0;
601        for _ in 0..5 {
602            let conn = pool.acquire().await.unwrap();
603            if ids.contains(&conn.id) {
604                reused += 1;
605            }
606            drop(conn);
607        }
608        assert!(reused >= 4, "most connections should be reused");
609    }
610
611    // ─── Pool exhaustion and timeout ──────────────────────────────────────
612
613    #[tokio::test]
614    async fn test_pool_exhaustion_short_timeout() {
615        let config = PoolConfig {
616            max_size: 1,
617            create_timeout: Duration::from_secs(1),
618            wait_timeout: Duration::from_millis(100),
619        };
620        let pool = LockFreePool::new(
621            Box::new(|| {
622                Box::pin(async { Ok(TestConnection { id: 0, valid: true }) })
623                    as BoxFuture<'static, Result<TestConnection, String>>
624            }) as CreateFn<TestConnection>,
625            Box::new(|_conn: &TestConnection| true) as ValidateFn<TestConnection>,
626            config,
627        );
628
629        let conn1 = pool.acquire().await.unwrap();
630        let result = pool.acquire().await;
631        assert!(result.is_err());
632        assert_eq!(result.unwrap_err(), PoolError::Timeout);
633        drop(conn1);
634    }
635
636    #[tokio::test]
637    async fn test_acquire_no_timeout_when_conn_returned() {
638        // Verify that a returned connection unblocks a waiting acquirer
639        let config = PoolConfig {
640            max_size: 1,
641            create_timeout: Duration::from_secs(1),
642            wait_timeout: Duration::from_secs(5),
643        };
644        let pool = Arc::new(LockFreePool::new(
645            Box::new(|| {
646                Box::pin(async { Ok(TestConnection { id: 0, valid: true }) })
647                    as BoxFuture<'static, Result<TestConnection, String>>
648            }) as CreateFn<TestConnection>,
649            Box::new(|_conn: &TestConnection| true) as ValidateFn<TestConnection>,
650            config,
651        ));
652
653        let conn1 = pool.acquire().await.unwrap();
654        let pool_clone = pool.clone();
655
656        let handle = tokio::spawn(async move {
657            pool_clone.acquire().await
658        });
659
660        sleep(Duration::from_millis(50)).await;
661        drop(conn1);
662
663        let result = handle.await.unwrap();
664        assert!(result.is_ok(), "returned conn should unblock waiter");
665    }
666
667    // ─── Connection validation ─────────────────────────────────────────────
668
669    #[tokio::test]
670    async fn test_validation_rejects_invalid_connections() {
671        // Validator always returns false — every idle connection is rejected.
672        // Pool must create a new connection on every reuse.
673        let reject_count = Arc::new(AtomicU32::new(0));
674        let create_count = Arc::new(AtomicU32::new(0));
675
676        let create = {
677            let cc = create_count.clone();
678            Box::new(move || {
679                let id = cc.fetch_add(1, AtomicOrdering::Relaxed);
680                Box::pin(async move {
681                    Ok(TestConnection { id, valid: true })
682                }) as BoxFuture<'static, Result<TestConnection, String>>
683            }) as CreateFn<TestConnection>
684        };
685
686        let validate = {
687            let rc = reject_count.clone();
688            Box::new(move |_conn: &TestConnection| {
689                rc.fetch_add(1, AtomicOrdering::Relaxed);
690                false
691            }) as ValidateFn<TestConnection>
692        };
693
694        let pool = LockFreePool::new(
695            create,
696            validate,
697            PoolConfig {
698                max_size: 5,
699                create_timeout: Duration::from_secs(5),
700                wait_timeout: Duration::from_secs(1),
701            },
702        );
703
704        // First acquire: creates conn(id=0, no validation on creation path)
705        let conn1 = pool.acquire().await.unwrap();
706        assert_eq!(conn1.id, 0);
707        drop(conn1); // return to idle
708
709        // Second acquire: pops conn0 from idle, validator rejects,
710        // discards, creates conn(id=1)
711        let conn2 = pool.acquire().await.unwrap();
712        assert_eq!(conn2.id, 1, "rejected idle conn should be replaced");
713
714        let rejected = reject_count.load(AtomicOrdering::Relaxed);
715        assert_eq!(rejected, 1, "validator should be called exactly once");
716
717        drop(conn2);
718    }
719
720    // ─── Close behavior ───────────────────────────────────────────────────
721
722    #[tokio::test]
723    async fn test_close() {
724        let pool = create_test_pool(5, false, false);
725        let conn = pool.acquire().await.unwrap();
726        assert!(!pool.is_closed());
727        pool.close();
728        assert!(pool.is_closed());
729        // Acquire after close should fail
730        let result = pool.acquire().await;
731        assert!(result.is_err());
732        assert_eq!(result.unwrap_err(), PoolError::Closed);
733        drop(conn); // Should be handled gracefully
734    }
735
736    #[tokio::test]
737    async fn test_close_with_waiter() {
738        let config = PoolConfig {
739            max_size: 1,
740            create_timeout: Duration::from_secs(1),
741            wait_timeout: Duration::from_secs(10),
742        };
743        let pool = Arc::new(LockFreePool::new(
744            Box::new(|| {
745                Box::pin(async { Ok(TestConnection { id: 0, valid: true }) })
746                    as BoxFuture<'static, Result<TestConnection, String>>
747            }) as CreateFn<TestConnection>,
748            Box::new(|_conn: &TestConnection| true) as ValidateFn<TestConnection>,
749            config,
750        ));
751
752        let conn1 = pool.acquire().await.unwrap();
753        let pool_clone = pool.clone();
754
755        // Spawn a waiter that will be waiting for a connection
756        let handle = tokio::spawn(async move {
757            pool_clone.acquire().await
758        });
759
760        // Give the spawned task time to start waiting
761        sleep(Duration::from_millis(50)).await;
762
763        // Close the pool — the waiter should wake up and get Closed error
764        pool.close();
765        let result = handle.await.unwrap();
766        assert!(result.is_err());
767        assert_eq!(result.unwrap_err(), PoolError::Closed);
768        drop(conn1);
769    }
770
771    // ─── Concurrent access stress test ────────────────────────────────────
772
773    #[tokio::test]
774    async fn test_concurrent_acquire_release() {
775        let pool = Arc::new(create_test_pool(8, false, false));
776        let mut handles = Vec::new();
777
778        for _ in 0..16 {
779            let pool = pool.clone();
780            handles.push(tokio::spawn(async move {
781                for _ in 0..10 {
782                    let conn = pool.acquire().await.unwrap();
783                    // "Use" the connection briefly
784                    sleep(Duration::from_millis(5)).await;
785                    drop(conn); // Return to pool
786                }
787            }));
788        }
789
790        for h in handles {
791            h.await.unwrap();
792        }
793
794        let status = pool.status();
795        assert!(status.size <= 8, "pool should not exceed max_size");
796    }
797
798    #[tokio::test]
799    async fn test_concurrent_stress_high_contention() {
800        let pool = Arc::new(create_test_pool(4, false, false));
801        let mut handles = Vec::new();
802
803        for _ in 0..32 {
804            let pool = pool.clone();
805            handles.push(tokio::spawn(async move {
806                for _ in 0..25 {
807                    match pool.acquire().await {
808                        Ok(conn) => {
809                            // Minimal "work" — just hold briefly
810                            tokio::task::yield_now().await;
811                            drop(conn);
812                        }
813                        Err(PoolError::Timeout) => {
814                            // Expected when pool is saturated
815                            tokio::task::yield_now().await;
816                        }
817                        Err(e) => panic!("Unexpected error: {e}"),
818                    }
819                }
820            }));
821        }
822
823        for h in handles {
824            h.await.unwrap();
825        }
826
827        let status = pool.status();
828        assert!(status.size <= 4, "pool exceeded max_size: {}", status.size);
829        assert!(!status.closed);
830    }
831
832    // ─── Zero timeout (non-blocking) ──────────────────────────────────────
833
834    #[tokio::test]
835    async fn test_zero_wait_timeout() {
836        let config = PoolConfig {
837            max_size: 1,
838            create_timeout: Duration::from_secs(1),
839            wait_timeout: Duration::ZERO,
840        };
841        let pool = LockFreePool::new(
842            Box::new(|| {
843                Box::pin(async { Ok(TestConnection { id: 0, valid: true }) })
844                    as BoxFuture<'static, Result<TestConnection, String>>
845            }) as CreateFn<TestConnection>,
846            Box::new(|_conn: &TestConnection| true) as ValidateFn<TestConnection>,
847            config,
848        );
849
850        let _conn = pool.acquire().await.unwrap();
851        // Second acquire with zero timeout should fail immediately
852        let result = pool.acquire().await;
853        assert_eq!(result.unwrap_err(), PoolError::Timeout);
854    }
855
856    // ─── Create failures ──────────────────────────────────────────────────
857
858    #[tokio::test]
859    async fn test_create_failure() {
860        let pool = create_test_pool(5, true, false);
861        let result = pool.acquire().await;
862        assert!(result.is_err());
863        assert!(matches!(result.unwrap_err(), PoolError::CreateFailed(_)));
864    }
865
866    // ─── Take ownership (remove from pool) ────────────────────────────────
867
868    #[tokio::test]
869    async fn test_take_connection() {
870        let pool = create_test_pool(5, false, false);
871        let conn = pool.acquire().await.unwrap();
872        let id = conn.id;
873        let taken = PooledConnection::take(conn);
874        assert_eq!(taken.id, id);
875        // Connection is gone from pool
876        // No way to check size easily, but pool should have decremented
877        let status = pool.status();
878        assert_eq!(status.size, 0); // taken connection is removed
879    }
880
881    // ─── Clone pool ───────────────────────────────────────────────────────
882
883    #[tokio::test]
884    async fn test_pool_clone() {
885        let pool = create_test_pool(5, false, false);
886        let pool2 = pool.clone();
887        let conn = pool2.acquire().await.unwrap();
888        assert!(conn.valid);
889        drop(conn);
890    }
891
892    // ─── Close with connections checked out ───────────────────────────────
893
894    #[tokio::test]
895    async fn test_close_with_active_connections() {
896        let pool = create_test_pool(5, false, false);
897        let conn1 = pool.acquire().await.unwrap();
898        let conn2 = pool.acquire().await.unwrap();
899        pool.close();
900        assert!(pool.is_closed());
901        let result = pool.acquire().await;
902        assert_eq!(result.unwrap_err(), PoolError::Closed);
903        // Dropping checked-out connections after close should not panic
904        drop(conn1);
905        drop(conn2);
906    }
907}