events/once/
pooled_sync.rs

1//! Pooled events that provide automatic resource management.
2//!
3//! This module provides pooled variants of events that automatically manage their lifecycle
4//! using reference counting. Events are created from pools and automatically returned to the
5//! pool when both sender and receiver are dropped.
6
7#[cfg(debug_assertions)]
8use std::backtrace::Backtrace;
9use std::future::Future;
10use std::marker::PhantomPinned;
11use std::ops::Deref;
12use std::pin::Pin;
13use std::ptr::NonNull;
14use std::sync::{Arc, Mutex};
15use std::task;
16
17use pinned_pool::{Key, PinnedPool};
18
19use crate::{Disconnected, ERR_POISONED_LOCK, OnceEvent, ReflectiveTSend, Sealed, WithTwoOwners};
20
21/// A pool that manages thread-safe events with automatic cleanup.
22///
23/// The pool creates events on demand and automatically cleans them up when both
24/// sender and receiver endpoints are dropped.
25///
26/// This pool provides zero-allocation event reuse for high-frequency eventing scenarios
27/// in a thread-safe manner.
28///
29/// # Example
30///
31/// ```rust
32/// use events::OnceEventPool;
33/// # use futures::executor::block_on;
34///
35/// # block_on(async {
36/// let pool = OnceEventPool::<i32>::new();
37///
38/// // First usage - creates new event
39/// let (sender1, receiver1) = pool.bind_by_ref();
40/// sender1.send(42);
41/// let value1 = receiver1.await.unwrap();
42/// assert_eq!(value1, 42);
43/// // Event returned to pool when sender1/receiver1 are dropped
44///
45/// // Second usage - reuses the same event instance (efficient!)
46/// let (sender2, receiver2) = pool.bind_by_ref();
47/// sender2.send(100);
48/// let value2 = receiver2.await.unwrap();
49/// assert_eq!(value2, 100);
50/// // Same event reused - no additional allocation overhead
51/// # });
52/// ```
53#[derive(Debug)]
54pub struct OnceEventPool<T>
55where
56    T: Send,
57{
58    pool: Mutex<PinnedPool<WithTwoOwners<OnceEvent<T>>>>,
59
60    // It is invalid to move this type once it has been pinned.
61    _requires_pinning: PhantomPinned,
62}
63
64impl<T> OnceEventPool<T>
65where
66    T: Send,
67{
68    /// Creates a new empty event pool.
69    ///
70    /// # Example
71    ///
72    /// ```rust
73    /// use events::OnceEventPool;
74    ///
75    /// let pool = OnceEventPool::<String>::new();
76    /// ```
77    #[must_use]
78    pub fn new() -> Self {
79        Self {
80            pool: Mutex::new(PinnedPool::new()),
81            _requires_pinning: PhantomPinned,
82        }
83    }
84
85    /// Creates sender and receiver endpoints connected by reference to the pool.
86    ///
87    /// The pool will create a new event and return endpoints that reference it.
88    /// When both endpoints are dropped, the event will be automatically cleaned up.
89    ///
90    /// # Example
91    ///
92    /// ```rust
93    /// use events::OnceEventPool;
94    /// # use futures::executor::block_on;
95    ///
96    /// # block_on(async {
97    /// let pool = OnceEventPool::<i32>::new();
98    ///
99    /// // First event usage
100    /// let (sender1, receiver1) = pool.bind_by_ref();
101    /// sender1.send(42);
102    /// let value1 = receiver1.await.unwrap();
103    /// assert_eq!(value1, 42);
104    ///
105    /// // Second event usage - efficiently reuses the same underlying event
106    /// let (sender2, receiver2) = pool.bind_by_ref();
107    /// sender2.send(100);
108    /// let value2 = receiver2.await.unwrap();
109    /// assert_eq!(value2, 100);
110    /// # });
111    /// ```
112    #[must_use]
113    pub fn bind_by_ref(
114        &self,
115    ) -> (
116        PooledOnceSender<RefPool<'_, T>>,
117        PooledOnceReceiver<RefPool<'_, T>>,
118    ) {
119        let mut inner_pool = self.pool.lock().expect(ERR_POISONED_LOCK);
120
121        let inserter = inner_pool.begin_insert();
122        let key = inserter.key();
123
124        let item = inserter.insert(WithTwoOwners::new(OnceEvent::new()));
125
126        let item_ptr = NonNull::from(item.get_ref());
127
128        let pool_ref = RefPool { pool: self };
129
130        (
131            PooledOnceSender {
132                event: Some(item_ptr),
133                pool_ref: pool_ref.clone(),
134                key,
135            },
136            PooledOnceReceiver {
137                event: Some(item_ptr),
138                pool_ref,
139                key,
140            },
141        )
142    }
143
144    /// Creates sender and receiver endpoints connected by Arc to the pool.
145    ///
146    /// The pool will create a new event and return endpoints that hold Arc references.
147    /// When both endpoints are dropped, the event will be automatically cleaned up.
148    ///
149    /// # Example
150    ///
151    /// ```rust
152    /// use std::sync::Arc;
153    ///
154    /// use events::OnceEventPool;
155    ///
156    /// let pool = Arc::new(OnceEventPool::<i32>::new());
157    ///
158    /// // First usage
159    /// let (sender1, receiver1) = pool.bind_by_arc();
160    /// sender1.send(42);
161    /// let value1 = futures::executor::block_on(receiver1).unwrap();
162    /// assert_eq!(value1, 42);
163    ///
164    /// // Second usage - efficiently reuses the same pooled event
165    /// let (sender2, receiver2) = pool.bind_by_arc();
166    /// sender2.send(200);
167    /// let value2 = futures::executor::block_on(receiver2).unwrap();
168    /// assert_eq!(value2, 200);
169    /// ```
170    #[must_use]
171    pub fn bind_by_arc(
172        self: &Arc<Self>,
173    ) -> (PooledOnceSender<ArcPool<T>>, PooledOnceReceiver<ArcPool<T>>) {
174        let mut inner_pool = self.pool.lock().expect(ERR_POISONED_LOCK);
175
176        let inserter = inner_pool.begin_insert();
177        let key = inserter.key();
178
179        let item = inserter.insert(WithTwoOwners::new(OnceEvent::new()));
180
181        let item_ptr = NonNull::from(item.get_ref());
182
183        let pool_ref = ArcPool {
184            pool: Arc::clone(self),
185        };
186
187        (
188            PooledOnceSender {
189                event: Some(item_ptr),
190                pool_ref: pool_ref.clone(),
191                key,
192            },
193            PooledOnceReceiver {
194                event: Some(item_ptr),
195                pool_ref,
196                key,
197            },
198        )
199    }
200
201    /// Creates sender and receiver endpoints connected by raw pointer to the pool.
202    ///
203    /// The pool will create a new event and return endpoints that hold raw pointers.
204    /// When both endpoints are dropped, the event will be automatically cleaned up.
205    ///
206    /// # Safety
207    ///
208    /// The caller must ensure that:
209    /// - The pool remains valid and pinned for the entire lifetime of the sender and receiver
210    /// - The sender and receiver are dropped before the pool is dropped
211    ///
212    /// # Example
213    ///
214    /// ```rust
215    /// use events::OnceEventPool;
216    ///
217    /// let pool = Box::pin(OnceEventPool::<i32>::new());
218    ///
219    /// // First usage
220    /// // SAFETY: We ensure the pool is pinned and outlives the sender and receiver
221    /// let (sender1, receiver1) = unsafe { pool.as_ref().bind_by_ptr() };
222    /// sender1.send(42);
223    /// let value1 = futures::executor::block_on(receiver1).unwrap();
224    /// assert_eq!(value1, 42);
225    ///
226    /// // Second usage - reuses the same event from the pool efficiently
227    /// // SAFETY: Pool is still valid and pinned
228    /// let (sender2, receiver2) = unsafe { pool.as_ref().bind_by_ptr() };
229    /// sender2.send(100);
230    /// let value2 = futures::executor::block_on(receiver2).unwrap();
231    /// assert_eq!(value2, 100);
232    /// // Both sender and receiver pairs are dropped here, before pool
233    /// ```
234    #[must_use]
235    pub unsafe fn bind_by_ptr(
236        self: Pin<&Self>,
237    ) -> (PooledOnceSender<PtrPool<T>>, PooledOnceReceiver<PtrPool<T>>) {
238        let mut inner_pool = self.pool.lock().expect(ERR_POISONED_LOCK);
239
240        let inserter = inner_pool.begin_insert();
241        let key = inserter.key();
242
243        let item = inserter.insert(WithTwoOwners::new(OnceEvent::new()));
244
245        let item_ptr = NonNull::from(item.get_ref());
246
247        let pool_ref = PtrPool {
248            pool: NonNull::from(self.get_ref()),
249        };
250
251        (
252            PooledOnceSender {
253                event: Some(item_ptr),
254                pool_ref: pool_ref.clone(),
255                key,
256            },
257            PooledOnceReceiver {
258                event: Some(item_ptr),
259                pool_ref,
260                key,
261            },
262        )
263    }
264
265    /// Returns the number of events currently in the pool.
266    ///
267    /// This represents the count of events that are currently allocated in the pool,
268    /// including those that are currently bound to sender/receiver endpoints.
269    /// Events are removed from the pool only when both endpoints are dropped.
270    ///
271    /// # Example
272    ///
273    /// ```rust
274    /// use events::OnceEventPool;
275    ///
276    /// let pool = OnceEventPool::<i32>::new();
277    /// assert_eq!(pool.len(), 0);
278    ///
279    /// let (sender, receiver) = pool.bind_by_ref();
280    /// assert_eq!(pool.len(), 1); // Event is in pool while endpoints exist
281    ///
282    /// drop(sender);
283    /// drop(receiver);
284    /// assert_eq!(pool.len(), 0); // Event cleaned up after both endpoints dropped
285    /// ```
286    #[must_use]
287    pub fn len(&self) -> usize {
288        self.pool.lock().expect(ERR_POISONED_LOCK).len()
289    }
290
291    /// Returns whether the pool is empty.
292    ///
293    /// This is equivalent to `pool.len() == 0` but may be more efficient.
294    /// An empty pool may still have reserved capacity.
295    ///
296    /// # Example
297    ///
298    /// ```rust
299    /// use events::OnceEventPool;
300    ///
301    /// let pool = OnceEventPool::<i32>::new();
302    /// assert!(pool.is_empty());
303    ///
304    /// let (sender, receiver) = pool.bind_by_ref();
305    /// assert!(!pool.is_empty()); // Pool has event while endpoints exist
306    ///
307    /// drop(sender);
308    /// drop(receiver);
309    /// assert!(pool.is_empty()); // Pool empty after both endpoints dropped
310    /// ```
311    #[must_use]
312    pub fn is_empty(&self) -> bool {
313        self.pool.lock().expect(ERR_POISONED_LOCK).is_empty()
314    }
315
316    /// Shrinks the capacity of the pool to reduce memory usage.
317    ///
318    /// This method attempts to release unused memory by reducing the pool's capacity.
319    /// The actual reduction is implementation-dependent and may vary - some capacity
320    /// may be released, or none at all.
321    ///
322    /// # Example
323    ///
324    /// ```rust
325    /// use events::OnceEventPool;
326    ///
327    /// let pool = OnceEventPool::<i32>::new();
328    ///
329    /// // Use the pool which may grow its capacity
330    /// for _ in 0..100 {
331    ///     let (sender, receiver) = pool.bind_by_ref();
332    ///     sender.send(42);
333    ///     let _value = futures::executor::block_on(receiver);
334    /// }
335    ///
336    /// // Attempt to shrink to reduce memory usage
337    /// pool.shrink_to_fit();
338    /// ```
339    pub fn shrink_to_fit(&self) {
340        let mut inner_pool = self.pool.lock().expect("pool mutex should not be poisoned");
341        inner_pool.shrink_to_fit();
342    }
343
344    /// Uses the provided closure to inspect the backtraces of the current awaiter of each
345    /// event in the pool that is currently being awaited by someone.
346    ///
347    /// This method is only available in debug builds (`cfg(debug_assertions)`).
348    /// For any data to be present, `RUST_BACKTRACE=1` or `RUST_LIB_BACKTRACE=1` must be set.
349    ///
350    /// The closure is called once for each event in the pool that is currently being awaited by
351    /// someone.
352    #[cfg(debug_assertions)]
353    pub fn inspect_awaiters(&self, mut f: impl FnMut(&Backtrace)) {
354        let inner_pool = self.pool.lock().expect("pool mutex should not be poisoned");
355
356        for event in inner_pool.iter() {
357            event.inspect_awaiter(|bt| {
358                if let Some(bt) = bt {
359                    f(bt);
360                }
361            });
362        }
363    }
364}
365
366impl<T> Default for OnceEventPool<T>
367where
368    T: Send,
369{
370    fn default() -> Self {
371        Self::new()
372    }
373}
374
375/// Enables a sender or receiver to reference the pool that stores the event that connects them.
376///
377/// This is a sealed trait and exists for internal use only. You never need to use it.
378#[expect(private_bounds, reason = "intentional - sealed trait")]
379pub trait PoolRef<T>: Deref<Target = OnceEventPool<T>> + ReflectiveTSend + Sealed
380where
381    T: Send,
382{
383}
384
385/// An event pool referenced via `&` shared reference.
386///
387/// Only used in type names. Instances are created internally by [`OnceEventPool`].
388#[derive(Copy, Debug)]
389pub struct RefPool<'a, T>
390where
391    T: Send,
392{
393    pool: &'a OnceEventPool<T>,
394}
395
396impl<T> Sealed for RefPool<'_, T> where T: Send {}
397impl<T> PoolRef<T> for RefPool<'_, T> where T: Send {}
398impl<T> Deref for RefPool<'_, T>
399where
400    T: Send,
401{
402    type Target = OnceEventPool<T>;
403
404    fn deref(&self) -> &Self::Target {
405        self.pool
406    }
407}
408impl<T> Clone for RefPool<'_, T>
409where
410    T: Send,
411{
412    fn clone(&self) -> Self {
413        Self { pool: self.pool }
414    }
415}
416impl<T: Send> ReflectiveTSend for RefPool<'_, T> {
417    type T = T;
418}
419
420/// An event pool referenced via `Arc` shared reference.
421///
422/// Only used in type names. Instances are created internally by [`OnceEventPool`].
423#[derive(Debug)]
424pub struct ArcPool<T>
425where
426    T: Send,
427{
428    pool: Arc<OnceEventPool<T>>,
429}
430
431impl<T> Sealed for ArcPool<T> where T: Send {}
432impl<T> PoolRef<T> for ArcPool<T> where T: Send {}
433impl<T> Deref for ArcPool<T>
434where
435    T: Send,
436{
437    type Target = OnceEventPool<T>;
438
439    fn deref(&self) -> &Self::Target {
440        &self.pool
441    }
442}
443impl<T> Clone for ArcPool<T>
444where
445    T: Send,
446{
447    fn clone(&self) -> Self {
448        Self {
449            pool: Arc::clone(&self.pool),
450        }
451    }
452}
453impl<T: Send> ReflectiveTSend for ArcPool<T> {
454    type T = T;
455}
456
457/// An event pool referenced via raw pointer.
458///
459/// Only used in type names. Instances are created internally by [`OnceEventPool`].
460#[derive(Copy, Debug)]
461pub struct PtrPool<T>
462where
463    T: Send,
464{
465    pool: NonNull<OnceEventPool<T>>,
466}
467
468impl<T> Sealed for PtrPool<T> where T: Send {}
469impl<T> PoolRef<T> for PtrPool<T> where T: Send {}
470impl<T> Deref for PtrPool<T>
471where
472    T: Send,
473{
474    type Target = OnceEventPool<T>;
475
476    fn deref(&self) -> &Self::Target {
477        // SAFETY: The creator of the reference is responsible for ensuring the pool outlives it.
478        unsafe { self.pool.as_ref() }
479    }
480}
481impl<T> Clone for PtrPool<T>
482where
483    T: Send,
484{
485    fn clone(&self) -> Self {
486        Self { pool: self.pool }
487    }
488}
489impl<T: Send> ReflectiveTSend for PtrPool<T> {
490    type T = T;
491}
492// SAFETY: This is only used with the thread-safe pool (the pool is Sync).
493unsafe impl<T> Send for PtrPool<T> where T: Send {}
494
495/// A receiver that can receive a single value through a thread-safe event.
496///
497/// The type of the value is the inner type parameter,
498/// i.e. the `T` in `PooledOnceReceiver<ArcPool<T>>`.
499///
500/// The outer type parameter determines the mechanism by which the endpoint is bound to the event
501/// pool. Different binding mechanisms offer different performance characteristics and resource
502/// management patterns.
503#[derive(Debug)]
504pub struct PooledOnceSender<P>
505where
506    P: PoolRef<<P as ReflectiveTSend>::T>,
507{
508    // This is a pointer to avoid contaminating the type signature with the event lifetime.
509    //
510    // SAFETY: We rely on the inner pool guaranteeing pinning and us owning a counted reference.
511    event: Option<NonNull<WithTwoOwners<OnceEvent<P::T>>>>,
512
513    pool_ref: P,
514    key: Key,
515}
516
517impl<P> PooledOnceSender<P>
518where
519    P: PoolRef<<P as ReflectiveTSend>::T>,
520{
521    /// Sends a value through the event.
522    ///
523    /// This method consumes the sender and always succeeds, regardless of whether
524    /// there is a receiver waiting.
525    ///
526    /// # Example
527    ///
528    /// ```rust
529    /// use events::OnceEventPool;
530    ///
531    /// let pool = OnceEventPool::new();
532    /// let (sender, receiver) = pool.bind_by_ref();
533    ///
534    /// sender.send(42);
535    /// let value = futures::executor::block_on(receiver).unwrap();
536    /// assert_eq!(value, 42);
537    /// ```
538    #[inline]
539    pub fn send(self, value: P::T) {
540        // SAFETY: See comments on field.
541        let event = unsafe {
542            self.event
543                .expect("event is only None during destruction")
544                .as_ref()
545        };
546
547        event.set(value);
548    }
549}
550
551impl<P> Drop for PooledOnceSender<P>
552where
553    P: PoolRef<<P as ReflectiveTSend>::T>,
554{
555    #[inline]
556    fn drop(&mut self) {
557        // SAFETY: See comments on field.
558        let event = unsafe { self.event.expect("only possible on double drop").as_ref() };
559
560        // The event is going to be destroyed, so we cannot reference it anymore.
561        self.event = None;
562
563        // Signal that the sender was dropped before handling reference counting.
564        // This ensures receivers get Disconnected errors if the sender is dropped without sending.
565        event.sender_dropped();
566
567        if event.release_one() {
568            self.pool_ref
569                .pool
570                .lock()
571                .expect(ERR_POISONED_LOCK)
572                .remove(self.key);
573        }
574    }
575}
576
577// SAFETY: The NonNull marks it !Send by default but we know that everything behind the pointer
578// is thread-safe, so all is well. We also require `Send` from `R` to be extra safe here.
579unsafe impl<P> Send for PooledOnceSender<P> where P: PoolRef<<P as ReflectiveTSend>::T> + Send {}
580
581/// A receiver that can receive a single value through a thread-safe event.
582///
583/// The type of the value is the inner type parameter,
584/// i.e. the `T` in `PooledOnceReceiver<ArcPool<T>>`.
585///
586/// The outer type parameter determines the mechanism by which the endpoint is bound to the event
587/// pool. Different binding mechanisms offer different performance characteristics and resource
588/// management patterns.
589#[derive(Debug)]
590pub struct PooledOnceReceiver<P>
591where
592    P: PoolRef<<P as ReflectiveTSend>::T>,
593{
594    // This is a pointer to avoid contaminating the type signature with the event lifetime.
595    //
596    // SAFETY: We rely on the inner pool guaranteeing pinning and us owning a counted reference.
597    event: Option<NonNull<WithTwoOwners<OnceEvent<P::T>>>>,
598
599    pool_ref: P,
600    key: Key,
601}
602
603impl<P> PooledOnceReceiver<P>
604where
605    P: PoolRef<<P as ReflectiveTSend>::T>,
606{
607    /// Drops the inner state, releasing the event back to the pool.
608    /// May also be used from contexts where the receiver itself is not yet consumed.
609    fn drop_inner(&mut self) {
610        let Some(event) = self.event else {
611            // Already pseudo-consumed the receiver as part of the Future impl.
612            return;
613        };
614
615        // Regardless of whether we were the last reference holder or not, we are no longer
616        // allowed to reference the event as we are releasing our reference.
617        self.event = None;
618
619        // SAFETY: See comments on field.
620        let event = unsafe { event.as_ref() };
621
622        if event.release_one() {
623            self.pool_ref
624                .pool
625                .lock()
626                .expect(ERR_POISONED_LOCK)
627                .remove(self.key);
628        }
629    }
630}
631
632impl<P> Future for PooledOnceReceiver<P>
633where
634    P: PoolRef<<P as ReflectiveTSend>::T>,
635{
636    type Output = Result<P::T, Disconnected>;
637
638    #[inline]
639    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
640        // SAFETY: We are not moving anything, just touching internal state.
641        let this = unsafe { self.get_unchecked_mut() };
642
643        // SAFETY: See comments on field.
644        let event = unsafe {
645            this.event
646                .expect("polling a Future after completion is invalid")
647                .as_ref()
648        };
649
650        let poll_result = event.poll(cx.waker());
651
652        poll_result.map_or_else(
653            || task::Poll::Pending,
654            |value| {
655                this.drop_inner();
656                task::Poll::Ready(value)
657            },
658        )
659    }
660}
661
662impl<P> Drop for PooledOnceReceiver<P>
663where
664    P: PoolRef<<P as ReflectiveTSend>::T>,
665{
666    #[inline]
667    fn drop(&mut self) {
668        self.drop_inner();
669    }
670}
671
672// SAFETY: The NonNull marks it !Send by default but we know that everything behind the pointer
673// is thread-safe, so all is well. We also require `Send` from `R` to be extra safe here.
674unsafe impl<P> Send for PooledOnceReceiver<P> where P: PoolRef<<P as ReflectiveTSend>::T> + Send {}
675
676#[cfg(test)]
677mod tests {
678    use std::pin::pin;
679
680    use futures::task::noop_waker_ref;
681    use static_assertions::{assert_impl_all, assert_not_impl_any};
682    use testing::with_watchdog;
683
684    use super::*;
685
686    #[test]
687    fn event_pool_by_ref() {
688        with_watchdog(|| {
689            let pool = OnceEventPool::<i32>::new();
690
691            // Pool starts empty
692            assert_eq!(pool.len(), 0);
693            assert!(pool.is_empty());
694
695            let (sender, receiver) = pool.bind_by_ref();
696
697            // Pool should have 1 event while endpoints are bound
698            assert_eq!(pool.len(), 1);
699            assert!(!pool.is_empty());
700
701            sender.send(42);
702            let value = futures::executor::block_on(receiver).unwrap();
703            assert_eq!(value, 42);
704
705            // After endpoints are dropped, pool should be empty
706            assert_eq!(pool.len(), 0);
707            assert!(pool.is_empty());
708        });
709    }
710
711    #[test]
712    fn pool_drop_cleanup() {
713        with_watchdog(|| {
714            let pool = OnceEventPool::<i32>::new();
715
716            // Create and drop sender/receiver without using them
717            let (sender, receiver) = pool.bind_by_ref();
718            drop(sender);
719            drop(receiver);
720
721            // Pool should be empty (the event should have been cleaned up)
722            // This is implementation detail but shows the cleanup works
723        });
724    }
725
726    #[test]
727    fn pool_multiple_events() {
728        with_watchdog(|| {
729            let pool = OnceEventPool::<i32>::new();
730
731            // Test one event first
732            let (sender1, receiver1) = pool.bind_by_ref();
733            sender1.send(1);
734            let value1 = futures::executor::block_on(receiver1).unwrap();
735            assert_eq!(value1, 1);
736
737            // Test another event
738            let (sender2, receiver2) = pool.bind_by_ref();
739            sender2.send(2);
740            let value2 = futures::executor::block_on(receiver2).unwrap();
741            assert_eq!(value2, 2);
742        });
743    }
744
745    #[test]
746    fn event_pool_by_arc() {
747        with_watchdog(|| {
748            let pool = Arc::new(OnceEventPool::<i32>::new());
749
750            // Pool starts empty
751            assert_eq!(pool.len(), 0);
752            assert!(pool.is_empty());
753
754            let (sender, receiver) = pool.bind_by_arc();
755
756            // Pool should have 1 event while endpoints are bound
757            assert_eq!(pool.len(), 1);
758            assert!(!pool.is_empty());
759
760            sender.send(42);
761            let value = futures::executor::block_on(receiver).unwrap();
762            assert_eq!(value, 42);
763
764            // After endpoints are dropped, pool should be empty
765            assert_eq!(pool.len(), 0);
766            assert!(pool.is_empty());
767        });
768    }
769
770    #[test]
771    fn event_pool_by_ptr() {
772        with_watchdog(|| {
773            let pool = Box::pin(OnceEventPool::<i32>::new());
774
775            // Pool starts empty
776            assert_eq!(pool.len(), 0);
777            assert!(pool.is_empty());
778
779            // SAFETY: We ensure the pool is pinned and outlives the sender and receiver
780            let (sender, receiver) = unsafe { pool.as_ref().bind_by_ptr() };
781
782            // Pool should have 1 event while endpoints are bound
783            assert_eq!(pool.len(), 1);
784            assert!(!pool.is_empty());
785
786            sender.send(42);
787            let value = futures::executor::block_on(receiver).unwrap();
788            assert_eq!(value, 42);
789
790            // After endpoints are dropped, pool should be empty
791            assert_eq!(pool.len(), 0);
792            assert!(pool.is_empty());
793        });
794    }
795
796    // Memory leak detection tests - these specifically test that cleanup occurs on drop
797    #[test]
798    fn by_ref_sender_drop_cleanup() {
799        with_watchdog(|| {
800            let pool = OnceEventPool::<i32>::new();
801            {
802                let (sender, _receiver) = pool.bind_by_ref();
803
804                // Force the sender to be dropped without being consumed by send()
805                drop(sender);
806                // Receiver will be dropped at end of scope
807            }
808
809            // Create a new event to verify the pool is still functional
810            let (sender2, receiver2) = pool.bind_by_ref();
811            sender2.send(123);
812            let value = futures::executor::block_on(receiver2).unwrap();
813            assert_eq!(value, 123);
814        });
815    }
816
817    #[test]
818    fn by_ref_receiver_drop_cleanup() {
819        with_watchdog(|| {
820            let pool = OnceEventPool::<i32>::new();
821            {
822                let (_sender, receiver) = pool.bind_by_ref();
823
824                // Force the receiver to be dropped without being consumed by recv()
825                drop(receiver);
826                // Sender will be dropped at end of scope
827            }
828
829            // Create a new event to verify the pool is still functional
830            let (sender2, receiver2) = pool.bind_by_ref();
831            sender2.send(456);
832            let value = futures::executor::block_on(receiver2).unwrap();
833            assert_eq!(value, 456);
834        });
835    }
836
837    #[test]
838    fn by_arc_sender_drop_cleanup() {
839        with_watchdog(|| {
840            let pool = Arc::new(OnceEventPool::<i32>::new());
841            let (sender, _receiver) = pool.bind_by_arc();
842
843            // Force the sender to be dropped without being consumed by send()
844            drop(sender);
845
846            // Create a new event to verify the pool is still functional
847            let (sender2, receiver2) = pool.bind_by_arc();
848            sender2.send(654);
849            let value = futures::executor::block_on(receiver2).unwrap();
850            assert_eq!(value, 654);
851        });
852    }
853
854    #[test]
855    fn by_arc_receiver_drop_cleanup() {
856        with_watchdog(|| {
857            let pool = Arc::new(OnceEventPool::<i32>::new());
858            let (_sender, receiver) = pool.bind_by_arc();
859
860            // Force the receiver to be dropped without being consumed by recv()
861            drop(receiver);
862
863            // Create a new event to verify the pool is still functional
864            let (sender2, receiver2) = pool.bind_by_arc();
865            sender2.send(987);
866            let value = futures::executor::block_on(receiver2).unwrap();
867            assert_eq!(value, 987);
868        });
869    }
870
871    #[test]
872    fn by_ptr_sender_drop_cleanup() {
873        with_watchdog(|| {
874            let pool = Box::pin(OnceEventPool::<i32>::new());
875
876            // SAFETY: We ensure the pool is pinned and outlives the sender and receiver
877            let (sender, _receiver) = unsafe { pool.as_ref().bind_by_ptr() };
878
879            // Force the sender to be dropped without being consumed by send()
880            drop(sender);
881
882            // Create a new event to verify the pool is still functional
883            // SAFETY: We ensure the pool is pinned and outlives the sender and receiver
884            let (sender2, receiver2) = unsafe { pool.as_ref().bind_by_ptr() };
885            sender2.send(147);
886            let value = futures::executor::block_on(receiver2).unwrap();
887            assert_eq!(value, 147);
888        });
889    }
890
891    #[test]
892    fn by_ptr_receiver_drop_cleanup() {
893        with_watchdog(|| {
894            let pool = Box::pin(OnceEventPool::<i32>::new());
895
896            // SAFETY: We ensure the pool is pinned and outlives the sender and receiver
897            let (_sender, receiver) = unsafe { pool.as_ref().bind_by_ptr() };
898
899            // Force the receiver to be dropped without being consumed by recv()
900            drop(receiver);
901
902            // Create a new event to verify the pool is still functional
903            // SAFETY: We ensure the pool is pinned and outlives the sender and receiver
904            let (sender2, receiver2) = unsafe { pool.as_ref().bind_by_ptr() };
905            sender2.send(258);
906            let value = futures::executor::block_on(receiver2).unwrap();
907            assert_eq!(value, 258);
908        });
909    }
910
911    #[test]
912    fn dec_ref_and_cleanup_is_called() {
913        with_watchdog(|| {
914            let pool = OnceEventPool::<i32>::new();
915
916            // Create multiple events and drop them without using
917            for _ in 0..5 {
918                let (sender, receiver) = pool.bind_by_ref();
919                drop(sender);
920                drop(receiver);
921            }
922
923            // Verify pool still works correctly after cleanup
924            let (sender, receiver) = pool.bind_by_ref();
925            sender.send(999);
926            let value = futures::executor::block_on(receiver).unwrap();
927            assert_eq!(value, 999);
928        });
929    }
930
931    #[test]
932    fn pool_cleanup_verified_by_capacity() {
933        with_watchdog(|| {
934            let pool = OnceEventPool::<i32>::new();
935
936            // Create many events and drop them without using - this should not grow the pool permanently
937            for i in 0..10 {
938                let (sender, receiver) = pool.bind_by_ref();
939                if i % 2 == 0 {
940                    drop(sender);
941                    drop(receiver);
942                } else {
943                    // Use some events normally
944                    sender.send(i);
945                    let _value = futures::executor::block_on(receiver);
946                }
947            }
948
949            // The pool should have cleaned up unused events
950            // If cleanup is broken, the pool would retain all the unused events
951            // This is a bit of an implementation detail but it's necessary to catch the leak
952
953            // Create one more event to verify pool still works
954            let (sender, receiver) = pool.bind_by_ref();
955            sender.send(42);
956            let value = futures::executor::block_on(receiver).unwrap();
957            assert_eq!(value, 42);
958        });
959    }
960
961    #[test]
962    fn pool_stress_test_no_leak() {
963        with_watchdog(|| {
964            let pool = OnceEventPool::<u64>::new();
965
966            // Stress test with many dropped events
967            for _ in 0..100 {
968                let (sender, receiver) = pool.bind_by_ref();
969                // Drop without using
970                drop(sender);
971                drop(receiver);
972            }
973
974            // Pool should still work efficiently
975            let (sender, receiver) = pool.bind_by_ref();
976            sender.send(999);
977            let value = futures::executor::block_on(receiver).unwrap();
978            assert_eq!(value, 999);
979        });
980    }
981
982    #[test]
983    fn by_ref_drop_actually_cleans_up_pool() {
984        let pool = OnceEventPool::<u32>::new();
985
986        // Create many events but drop them without use
987        for _ in 0..100 {
988            let (_sender, _receiver) = pool.bind_by_ref();
989            // Both sender and receiver will be dropped here
990        }
991
992        // Pool should be cleaned up - all events should be removed
993        // If Drop implementations don't work, pool will retain unused events
994        let mut pool_guard = pool.pool.lock().unwrap();
995        assert_eq!(
996            pool_guard.len(),
997            0,
998            "Pool still contains unused events - Drop implementations not working"
999        );
1000
1001        // An empty pool should be able to shrink to capacity 0
1002        pool_guard.shrink_to_fit();
1003        assert_eq!(
1004            pool_guard.capacity(),
1005            0,
1006            "Empty pool should shrink to capacity 0"
1007        );
1008    }
1009
1010    #[test]
1011    fn by_arc_drop_actually_cleans_up_pool() {
1012        let pool = Arc::new(OnceEventPool::<u32>::new());
1013
1014        // Create many events but drop them without use
1015        for _ in 0..100 {
1016            let (_sender, _receiver) = pool.bind_by_arc();
1017            // Both sender and receiver will be dropped here
1018        }
1019
1020        // Pool should be cleaned up - all events should be removed
1021        let mut pool_guard = pool.pool.lock().unwrap();
1022        assert_eq!(
1023            pool_guard.len(),
1024            0,
1025            "Pool still contains unused events - Drop implementations not working"
1026        );
1027
1028        // An empty pool should be able to shrink to capacity 0
1029        pool_guard.shrink_to_fit();
1030        assert_eq!(
1031            pool_guard.capacity(),
1032            0,
1033            "Empty pool should shrink to capacity 0"
1034        );
1035    }
1036
1037    #[test]
1038    fn by_ptr_drop_actually_cleans_up_pool() {
1039        // Test ptr-based pooled events cleanup by checking pool state
1040        for iteration in 0..10 {
1041            {
1042                let pool = Box::pin(OnceEventPool::<u32>::new());
1043                // SAFETY: We pin the pool for the duration of by_ptr call
1044                let (_sender, _receiver) = unsafe { pool.as_ref().bind_by_ptr() };
1045                // sender and receiver will be dropped here
1046            }
1047
1048            // For this test, we'll verify that repeated operations don't accumulate
1049            // If Drop implementations don't work, we'd see memory accumulation
1050            println!("Iteration {iteration}: Pool operations completed");
1051        }
1052    }
1053
1054    #[test]
1055    fn dec_ref_and_cleanup_actually_removes_events() {
1056        let pool = OnceEventPool::<u32>::new();
1057
1058        // Test 1: Check that events are added to pool
1059        let pool_len_before = {
1060            let pool_guard = pool.pool.lock().unwrap();
1061            pool_guard.len()
1062        };
1063
1064        // Create events in a scope to ensure they're dropped
1065        let sender_key = {
1066            let (sender, receiver) = pool.bind_by_ref();
1067            let key = sender.key;
1068
1069            // Events should be in pool now (don't check len while borrowed)
1070
1071            drop(sender);
1072            drop(receiver);
1073            key
1074        };
1075
1076        // Now check that cleanup worked
1077        let pool_len_after = {
1078            let pool_guard = pool.pool.lock().unwrap();
1079            pool_guard.len()
1080        };
1081        assert_eq!(
1082            pool_len_after, pool_len_before,
1083            "Pool not cleaned up after dropping events - dec_ref_and_cleanup not working, \
1084             key: {sender_key:?}"
1085        );
1086    }
1087
1088    #[test]
1089    fn shrink_to_fit_with_empty_pool_shrinks_to_zero() {
1090        let pool = OnceEventPool::<u32>::new();
1091
1092        // Create and drop events without using them
1093        for _ in 0..10 {
1094            drop(pool.bind_by_ref());
1095        }
1096
1097        assert_eq!(pool.len(), 0);
1098        assert!(pool.is_empty());
1099
1100        // Shrink the pool to fit
1101        pool.shrink_to_fit();
1102
1103        assert_eq!(
1104            pool.pool.lock().unwrap().capacity(),
1105            0,
1106            "Empty pool should shrink to capacity 0"
1107        );
1108    }
1109
1110    #[test]
1111    fn event_removed_from_pool_after_endpoints_immediate_drop() {
1112        let pool = OnceEventPool::<u32>::new();
1113
1114        drop(pool.bind_by_ref());
1115
1116        assert_eq!(pool.len(), 0);
1117        assert!(pool.is_empty());
1118    }
1119
1120    #[test]
1121    fn pool_len_and_is_empty_methods() {
1122        let pool = OnceEventPool::<u32>::new();
1123
1124        // Initially empty
1125        assert_eq!(pool.len(), 0);
1126        assert!(pool.is_empty());
1127
1128        // Create first event
1129        let (sender1, receiver1) = pool.bind_by_ref();
1130        assert_eq!(pool.len(), 1);
1131        assert!(!pool.is_empty());
1132
1133        // Create second event while first is still bound
1134        let (sender2, receiver2) = pool.bind_by_ref();
1135        assert_eq!(pool.len(), 2);
1136        assert!(!pool.is_empty());
1137
1138        // Drop first event endpoints
1139        drop(sender1);
1140        drop(receiver1);
1141        assert_eq!(pool.len(), 1);
1142        assert!(!pool.is_empty());
1143
1144        // Drop second event endpoints
1145        drop(sender2);
1146        drop(receiver2);
1147        assert_eq!(pool.len(), 0);
1148        assert!(pool.is_empty());
1149    }
1150
1151    #[test]
1152    fn pooled_event_receiver_gets_disconnected_when_sender_dropped() {
1153        with_watchdog(|| {
1154            futures::executor::block_on(async {
1155                let pool = OnceEventPool::<i32>::new();
1156                let (sender, receiver) = pool.bind_by_ref();
1157
1158                // Drop the sender without sending anything
1159                drop(sender);
1160
1161                // Receiver should get a Disconnected error
1162                let result = receiver.await;
1163                assert!(result.is_err());
1164                assert!(matches!(result, Err(Disconnected)));
1165            });
1166        });
1167    }
1168
1169    #[test]
1170    fn pooled_event_by_arc_receiver_gets_disconnected_when_sender_dropped() {
1171        with_watchdog(|| {
1172            futures::executor::block_on(async {
1173                let pool = Arc::new(OnceEventPool::<i32>::new());
1174                let (sender, receiver) = pool.bind_by_arc();
1175
1176                // Drop the sender without sending anything
1177                drop(sender);
1178
1179                // Receiver should get a Disconnected error
1180                let result = receiver.await;
1181                assert!(result.is_err());
1182                assert!(matches!(result, Err(Disconnected)));
1183            });
1184        });
1185    }
1186
1187    #[test]
1188    fn pooled_event_by_ptr_receiver_gets_disconnected_when_sender_dropped() {
1189        with_watchdog(|| {
1190            futures::executor::block_on(async {
1191                let pool = Box::pin(OnceEventPool::<i32>::new());
1192
1193                // SAFETY: We ensure the pool is pinned and outlives the sender and receiver
1194                let (sender, receiver) = unsafe { pool.as_ref().bind_by_ptr() };
1195
1196                // Drop the sender without sending anything
1197                drop(sender);
1198
1199                // Receiver should get a Disconnected error
1200                let result = receiver.await;
1201                assert!(result.is_err());
1202                assert!(matches!(result, Err(Disconnected)));
1203            });
1204        });
1205    }
1206
1207    #[test]
1208    fn pooled_sender_dropped_when_awaiting_signals_disconnected() {
1209        let pool = OnceEventPool::<i32>::new();
1210        let (sender, receiver) = pool.bind_by_ref();
1211
1212        let mut receiver = pin!(receiver);
1213        let mut context = task::Context::from_waker(noop_waker_ref());
1214        assert!(matches!(
1215            receiver.as_mut().poll(&mut context),
1216            task::Poll::Pending
1217        ));
1218
1219        drop(sender);
1220
1221        let mut context = task::Context::from_waker(noop_waker_ref());
1222        assert!(matches!(
1223            receiver.as_mut().poll(&mut context),
1224            task::Poll::Ready(Err(Disconnected))
1225        ));
1226    }
1227
1228    #[cfg(debug_assertions)]
1229    #[test]
1230    fn inspect_awaiters_empty_pool() {
1231        let pool = OnceEventPool::<i32>::new();
1232
1233        let mut count = 0;
1234        pool.inspect_awaiters(|_| {
1235            count += 1;
1236        });
1237
1238        assert_eq!(count, 0);
1239    }
1240
1241    #[cfg(debug_assertions)]
1242    #[test]
1243    fn inspect_awaiters_no_awaiters() {
1244        let pool = OnceEventPool::<String>::new();
1245
1246        // Create some events but don't await them
1247        let (_sender1, _receiver1) = pool.bind_by_ref();
1248        let (_sender2, _receiver2) = pool.bind_by_ref();
1249
1250        let mut count = 0;
1251        pool.inspect_awaiters(|_| {
1252            count += 1;
1253        });
1254
1255        assert_eq!(count, 0);
1256    }
1257
1258    #[cfg(debug_assertions)]
1259    #[test]
1260    fn inspect_awaiters_with_awaiters() {
1261        let pool = OnceEventPool::<i32>::new();
1262
1263        // Create events and start awaiting them
1264        let (_sender1, receiver1) = pool.bind_by_ref();
1265        let (_sender2, receiver2) = pool.bind_by_ref();
1266
1267        let mut context = task::Context::from_waker(noop_waker_ref());
1268        let mut pinned_receiver1 = pin!(receiver1);
1269        let mut pinned_receiver2 = pin!(receiver2);
1270
1271        // Poll both receivers to create awaiters
1272        let _poll1 = pinned_receiver1.as_mut().poll(&mut context);
1273        let _poll2 = pinned_receiver2.as_mut().poll(&mut context);
1274
1275        let mut count = 0;
1276        pool.inspect_awaiters(|_backtrace| {
1277            count += 1;
1278        });
1279
1280        assert_eq!(count, 2);
1281    }
1282
1283    #[cfg(debug_assertions)]
1284    #[test]
1285    fn inspect_awaiters_mixed_states() {
1286        let pool = OnceEventPool::<String>::new();
1287
1288        // Create multiple events in different states
1289        let (_sender1, receiver1) = pool.bind_by_ref();
1290        let (sender2, receiver2) = pool.bind_by_ref();
1291        let (_sender3, receiver3) = pool.bind_by_ref();
1292
1293        // Only poll receiver1 and receiver3
1294        let mut context = task::Context::from_waker(noop_waker_ref());
1295        let mut pinned_receiver1 = pin!(receiver1);
1296        let mut pinned_receiver3 = pin!(receiver3);
1297
1298        let _poll1 = pinned_receiver1.as_mut().poll(&mut context);
1299        let _poll3 = pinned_receiver3.as_mut().poll(&mut context);
1300
1301        // Complete sender2 without polling its receiver
1302        sender2.send("completed".to_string());
1303        drop(receiver2);
1304
1305        let mut count = 0;
1306        pool.inspect_awaiters(|_backtrace| {
1307            count += 1;
1308        });
1309
1310        // Should only count the two that are actually awaiting
1311        assert_eq!(count, 2);
1312    }
1313
1314    #[test]
1315    fn thread_safety() {
1316        // The pool is accessed across threads, so requires Sync as well as Send.
1317        assert_impl_all!(OnceEventPool<u32>: Send, Sync);
1318
1319        // These are all meant to be consumed locally - they may move between threads but are
1320        // not shared between threads, so Sync is not expected, only Send.
1321        assert_impl_all!(PooledOnceSender<RefPool<'static, u32>>: Send);
1322        assert_impl_all!(PooledOnceReceiver<RefPool<'static, u32>>: Send);
1323        assert_impl_all!(PooledOnceSender<ArcPool<u32>>: Send);
1324        assert_impl_all!(PooledOnceReceiver<ArcPool<u32>>: Send);
1325        assert_impl_all!(PooledOnceSender<PtrPool<u32>>: Send);
1326        assert_impl_all!(PooledOnceReceiver<PtrPool<u32>>: Send);
1327        assert_not_impl_any!(PooledOnceSender<RefPool<'static, u32>>: Sync);
1328        assert_not_impl_any!(PooledOnceReceiver<RefPool<'static, u32>>: Sync);
1329        assert_not_impl_any!(PooledOnceSender<ArcPool<u32>>: Sync);
1330        assert_not_impl_any!(PooledOnceReceiver<ArcPool<u32>>: Sync);
1331        assert_not_impl_any!(PooledOnceSender<PtrPool<u32>>: Sync);
1332        assert_not_impl_any!(PooledOnceReceiver<PtrPool<u32>>: Sync);
1333    }
1334}