events/once/
pooled_sync.rs

1//! Pooled events that provide automatic resource management.
2//!
3//! This module provides pooled variants of events that automatically manage their lifecycle
4//! using reference counting. Events are created from pools and automatically returned to the
5//! pool when both sender and receiver are dropped.
6
7#[cfg(debug_assertions)]
8use std::backtrace::Backtrace;
9use std::fmt::Debug;
10use std::future::Future;
11use std::marker::PhantomPinned;
12use std::mem::ManuallyDrop;
13use std::ops::Deref;
14use std::pin::Pin;
15use std::ptr::{self, NonNull};
16use std::sync::{Arc, Mutex};
17use std::{any, fmt, task};
18
19use infinity_pool::{RawPinnedPool, RawPooled};
20
21use crate::{Disconnected, ERR_POISONED_LOCK, OnceEvent, ReflectiveTSend, Sealed};
22
23/// A pool that manages thread-safe events with automatic cleanup.
24///
25/// The pool creates events on demand and automatically cleans them up when both
26/// sender and receiver endpoints are dropped.
27///
28/// This pool provides zero-allocation event reuse for high-frequency eventing scenarios
29/// in a thread-safe manner.
30///
31/// # Example
32///
33/// ```rust
34/// use events::OnceEventPool;
35/// # use futures::executor::block_on;
36///
37/// # block_on(async {
38/// let pool = OnceEventPool::<i32>::new();
39///
40/// // First usage - creates new event
41/// let (sender1, receiver1) = pool.bind_by_ref();
42/// sender1.send(42);
43/// let value1 = receiver1.await.unwrap();
44/// assert_eq!(value1, 42);
45/// // Event returned to pool when sender1/receiver1 are dropped
46///
47/// // Second usage - reuses the same event instance (efficient!)
48/// let (sender2, receiver2) = pool.bind_by_ref();
49/// sender2.send(100);
50/// let value2 = receiver2.await.unwrap();
51/// assert_eq!(value2, 100);
52/// // Same event reused - no additional allocation overhead
53/// # });
54/// ```
55pub struct OnceEventPool<T>
56where
57    T: Send,
58{
59    pool: Mutex<RawPinnedPool<OnceEvent<T>>>,
60
61    // It is invalid to move this type once it has been pinned.
62    _requires_pinning: PhantomPinned,
63}
64
65impl<T> Debug for OnceEventPool<T>
66where
67    T: Send,
68{
69    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
70        f.debug_struct("OnceEventPool")
71            .field("item_type", &format_args!("{}", any::type_name::<T>()))
72            .finish_non_exhaustive()
73    }
74}
75
76impl<T> OnceEventPool<T>
77where
78    T: Send,
79{
80    /// Creates a new empty event pool.
81    ///
82    /// # Example
83    ///
84    /// ```rust
85    /// use events::OnceEventPool;
86    ///
87    /// let pool = OnceEventPool::<String>::new();
88    /// ```
89    #[must_use]
90    pub fn new() -> Self {
91        Self {
92            pool: Mutex::new(RawPinnedPool::new()),
93            _requires_pinning: PhantomPinned,
94        }
95    }
96
97    /// Creates sender and receiver endpoints connected by reference to the pool.
98    ///
99    /// The pool will create a new event and return endpoints that reference it.
100    /// When both endpoints are dropped, the event will be automatically cleaned up.
101    ///
102    /// # Example
103    ///
104    /// ```rust
105    /// use events::OnceEventPool;
106    /// # use futures::executor::block_on;
107    ///
108    /// # block_on(async {
109    /// let pool = OnceEventPool::<i32>::new();
110    ///
111    /// // First event usage
112    /// let (sender1, receiver1) = pool.bind_by_ref();
113    /// sender1.send(42);
114    /// let value1 = receiver1.await.unwrap();
115    /// assert_eq!(value1, 42);
116    ///
117    /// // Second event usage - efficiently reuses the same underlying event
118    /// let (sender2, receiver2) = pool.bind_by_ref();
119    /// sender2.send(100);
120    /// let value2 = receiver2.await.unwrap();
121    /// assert_eq!(value2, 100);
122    /// # });
123    /// ```
124    #[must_use]
125    pub fn bind_by_ref(
126        &self,
127    ) -> (
128        PooledOnceSender<RefPool<'_, T>>,
129        PooledOnceReceiver<RefPool<'_, T>>,
130    ) {
131        let mut inner_pool = self.pool.lock().expect(ERR_POISONED_LOCK);
132
133        // SAFETY: We rely on OnceEvent::new_in_place_bound() for correct initialization.
134        let handle = unsafe { inner_pool.insert_with(OnceEvent::new_in_place_bound) };
135
136        let pool_ref = RefPool { pool: self };
137
138        let shared_handle = handle.into_shared();
139
140        (
141            PooledOnceSender {
142                pool_ref: pool_ref.clone(),
143                event: shared_handle,
144            },
145            PooledOnceReceiver {
146                pool_ref,
147                event: Some(shared_handle),
148            },
149        )
150    }
151
152    /// Creates sender and receiver endpoints connected by Arc to the pool.
153    ///
154    /// The pool will create a new event and return endpoints that hold Arc references.
155    /// When both endpoints are dropped, the event will be automatically cleaned up.
156    ///
157    /// # Example
158    ///
159    /// ```rust
160    /// use std::sync::Arc;
161    ///
162    /// use events::OnceEventPool;
163    ///
164    /// let pool = Arc::new(OnceEventPool::<i32>::new());
165    ///
166    /// // First usage
167    /// let (sender1, receiver1) = pool.bind_by_arc();
168    /// sender1.send(42);
169    /// let value1 = futures::executor::block_on(receiver1).unwrap();
170    /// assert_eq!(value1, 42);
171    ///
172    /// // Second usage - efficiently reuses the same pooled event
173    /// let (sender2, receiver2) = pool.bind_by_arc();
174    /// sender2.send(200);
175    /// let value2 = futures::executor::block_on(receiver2).unwrap();
176    /// assert_eq!(value2, 200);
177    /// ```
178    #[must_use]
179    pub fn bind_by_arc(
180        self: &Arc<Self>,
181    ) -> (PooledOnceSender<ArcPool<T>>, PooledOnceReceiver<ArcPool<T>>) {
182        let mut inner_pool = self.pool.lock().expect(ERR_POISONED_LOCK);
183
184        // SAFETY: We rely on OnceEvent::new_in_place_bound() for correct initialization.
185        let handle = unsafe { inner_pool.insert_with(OnceEvent::new_in_place_bound) };
186
187        let pool_ref = ArcPool {
188            pool: Arc::clone(self),
189        };
190
191        let shared_handle = handle.into_shared();
192
193        (
194            PooledOnceSender {
195                pool_ref: pool_ref.clone(),
196                event: shared_handle,
197            },
198            PooledOnceReceiver {
199                pool_ref,
200                event: Some(shared_handle),
201            },
202        )
203    }
204
205    /// Creates sender and receiver endpoints connected by raw pointer to the pool.
206    ///
207    /// The pool will create a new event and return endpoints that hold raw pointers.
208    /// When both endpoints are dropped, the event will be automatically cleaned up.
209    ///
210    /// # Safety
211    ///
212    /// The caller must ensure that:
213    /// - The pool remains valid and pinned for the entire lifetime of the sender and receiver
214    /// - The sender and receiver are dropped before the pool is dropped
215    ///
216    /// # Example
217    ///
218    /// ```rust
219    /// use events::OnceEventPool;
220    ///
221    /// let pool = Box::pin(OnceEventPool::<i32>::new());
222    ///
223    /// // First usage
224    /// // SAFETY: We ensure the pool is pinned and outlives the sender and receiver
225    /// let (sender1, receiver1) = unsafe { pool.as_ref().bind_by_ptr() };
226    /// sender1.send(42);
227    /// let value1 = futures::executor::block_on(receiver1).unwrap();
228    /// assert_eq!(value1, 42);
229    ///
230    /// // Second usage - reuses the same event from the pool efficiently
231    /// // SAFETY: Pool is still valid and pinned
232    /// let (sender2, receiver2) = unsafe { pool.as_ref().bind_by_ptr() };
233    /// sender2.send(100);
234    /// let value2 = futures::executor::block_on(receiver2).unwrap();
235    /// assert_eq!(value2, 100);
236    /// // Both sender and receiver pairs are dropped here, before pool
237    /// ```
238    #[must_use]
239    pub unsafe fn bind_by_ptr(
240        self: Pin<&Self>,
241    ) -> (PooledOnceSender<PtrPool<T>>, PooledOnceReceiver<PtrPool<T>>) {
242        let mut inner_pool = self.pool.lock().expect(ERR_POISONED_LOCK);
243
244        // SAFETY: We rely on OnceEvent::new_in_place_bound() for correct initialization.
245        let handle = unsafe { inner_pool.insert_with(OnceEvent::new_in_place_bound) };
246
247        let pool_ref = PtrPool {
248            pool: NonNull::from(self.get_ref()),
249        };
250
251        let shared_handle = handle.into_shared();
252
253        (
254            PooledOnceSender {
255                pool_ref: pool_ref.clone(),
256                event: shared_handle,
257            },
258            PooledOnceReceiver {
259                pool_ref,
260                event: Some(shared_handle),
261            },
262        )
263    }
264
265    /// Returns the number of events currently in the pool.
266    ///
267    /// This represents the count of events that are currently allocated in the pool,
268    /// including those that are currently bound to sender/receiver endpoints.
269    /// Events are removed from the pool only when both endpoints are dropped.
270    ///
271    /// # Example
272    ///
273    /// ```rust
274    /// use events::OnceEventPool;
275    ///
276    /// let pool = OnceEventPool::<i32>::new();
277    /// assert_eq!(pool.len(), 0);
278    ///
279    /// let (sender, receiver) = pool.bind_by_ref();
280    /// assert_eq!(pool.len(), 1); // Event is in pool while endpoints exist
281    ///
282    /// drop(sender);
283    /// drop(receiver);
284    /// assert_eq!(pool.len(), 0); // Event cleaned up after both endpoints dropped
285    /// ```
286    #[must_use]
287    pub fn len(&self) -> usize {
288        self.pool.lock().expect(ERR_POISONED_LOCK).len()
289    }
290
291    /// Returns whether the pool is empty.
292    ///
293    /// This is equivalent to `pool.len() == 0` but may be more efficient.
294    /// An empty pool may still have reserved capacity.
295    ///
296    /// # Example
297    ///
298    /// ```rust
299    /// use events::OnceEventPool;
300    ///
301    /// let pool = OnceEventPool::<i32>::new();
302    /// assert!(pool.is_empty());
303    ///
304    /// let (sender, receiver) = pool.bind_by_ref();
305    /// assert!(!pool.is_empty()); // Pool has event while endpoints exist
306    ///
307    /// drop(sender);
308    /// drop(receiver);
309    /// assert!(pool.is_empty()); // Pool empty after both endpoints dropped
310    /// ```
311    #[must_use]
312    pub fn is_empty(&self) -> bool {
313        self.pool.lock().expect(ERR_POISONED_LOCK).is_empty()
314    }
315
316    /// Shrinks the capacity of the pool to reduce memory usage.
317    ///
318    /// This method attempts to release unused memory by reducing the pool's capacity.
319    /// The actual reduction is implementation-dependent and may vary - some capacity
320    /// may be released, or none at all.
321    ///
322    /// # Example
323    ///
324    /// ```rust
325    /// use events::OnceEventPool;
326    ///
327    /// let pool = OnceEventPool::<i32>::new();
328    ///
329    /// // Use the pool which may grow its capacity
330    /// for _ in 0..100 {
331    ///     let (sender, receiver) = pool.bind_by_ref();
332    ///     sender.send(42);
333    ///     let _value = futures::executor::block_on(receiver);
334    /// }
335    ///
336    /// // Attempt to shrink to reduce memory usage
337    /// pool.shrink_to_fit();
338    /// ```
339    pub fn shrink_to_fit(&self) {
340        let mut inner_pool = self.pool.lock().expect(ERR_POISONED_LOCK);
341        inner_pool.shrink_to_fit();
342    }
343
344    /// Uses the provided closure to inspect the backtraces of the most recent awaiter of each
345    /// event in the pool (or `None` if it has never been awaited).
346    ///
347    /// This method is only available in debug builds (`cfg(debug_assertions)`).
348    /// For any data to be present, `RUST_BACKTRACE=1` or `RUST_LIB_BACKTRACE=1` must be set.
349    ///
350    /// The closure is called once for each event in the pool that is currently being awaited by
351    /// someone.
352    #[cfg(debug_assertions)]
353    pub fn inspect_awaiters(&self, mut f: impl FnMut(Option<&Backtrace>)) {
354        let inner_pool = self.pool.lock().expect(ERR_POISONED_LOCK);
355
356        for event_ptr in inner_pool.iter() {
357            // SAFETY: The pool remains alive for the duration of this function call, satisfying
358            // the lifetime requirement. The pointer is valid as it comes from the pool's iterator.
359            // We only ever create shared references to the events, so no conflicting exclusive
360            // references can exist.
361            let event = unsafe { event_ptr.as_ref() };
362            event.inspect_awaiter(&mut f);
363        }
364    }
365}
366
367impl<T> Default for OnceEventPool<T>
368where
369    T: Send,
370{
371    fn default() -> Self {
372        Self::new()
373    }
374}
375
376/// Enables a sender or receiver to reference the pool that stores the event that connects them.
377///
378/// This is a sealed trait and exists for internal use only. You never need to use it.
379#[expect(private_bounds, reason = "intentional - sealed trait")]
380pub trait PoolRef<T>: Deref<Target = OnceEventPool<T>> + ReflectiveTSend + Debug + Sealed
381where
382    T: Send,
383{
384}
385
386/// An event pool referenced via `&` shared reference.
387///
388/// Only used in type names. Instances are created internally by [`OnceEventPool`].
389#[derive(Copy)]
390pub struct RefPool<'a, T>
391where
392    T: Send,
393{
394    pool: &'a OnceEventPool<T>,
395}
396
397impl<T> Debug for RefPool<'_, T>
398where
399    T: Send,
400{
401    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
402        f.debug_struct("RefPool")
403            .field("item_type", &format_args!("{}", any::type_name::<T>()))
404            .finish_non_exhaustive()
405    }
406}
407
408impl<T> Sealed for RefPool<'_, T> where T: Send {}
409impl<T> PoolRef<T> for RefPool<'_, T> where T: Send {}
410impl<T> Deref for RefPool<'_, T>
411where
412    T: Send,
413{
414    type Target = OnceEventPool<T>;
415
416    fn deref(&self) -> &Self::Target {
417        self.pool
418    }
419}
420impl<T> Clone for RefPool<'_, T>
421where
422    T: Send,
423{
424    fn clone(&self) -> Self {
425        Self { pool: self.pool }
426    }
427}
428impl<T: Send> ReflectiveTSend for RefPool<'_, T> {
429    type T = T;
430}
431
432/// An event pool referenced via `Arc` shared reference.
433///
434/// Only used in type names. Instances are created internally by [`OnceEventPool`].
435pub struct ArcPool<T>
436where
437    T: Send,
438{
439    pool: Arc<OnceEventPool<T>>,
440}
441
442impl<T> Debug for ArcPool<T>
443where
444    T: Send,
445{
446    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
447        f.debug_struct("ArcPool")
448            .field("item_type", &format_args!("{}", any::type_name::<T>()))
449            .finish_non_exhaustive()
450    }
451}
452
453impl<T> Sealed for ArcPool<T> where T: Send {}
454impl<T> PoolRef<T> for ArcPool<T> where T: Send {}
455impl<T> Deref for ArcPool<T>
456where
457    T: Send,
458{
459    type Target = OnceEventPool<T>;
460
461    fn deref(&self) -> &Self::Target {
462        &self.pool
463    }
464}
465impl<T> Clone for ArcPool<T>
466where
467    T: Send,
468{
469    fn clone(&self) -> Self {
470        Self {
471            pool: Arc::clone(&self.pool),
472        }
473    }
474}
475impl<T: Send> ReflectiveTSend for ArcPool<T> {
476    type T = T;
477}
478
479/// An event pool referenced via raw pointer.
480///
481/// Only used in type names. Instances are created internally by [`OnceEventPool`].
482#[derive(Copy)]
483pub struct PtrPool<T>
484where
485    T: Send,
486{
487    pool: NonNull<OnceEventPool<T>>,
488}
489
490impl<T> Debug for PtrPool<T>
491where
492    T: Send,
493{
494    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
495        f.debug_struct("PtrPool")
496            .field("item_type", &format_args!("{}", any::type_name::<T>()))
497            .finish_non_exhaustive()
498    }
499}
500
501impl<T> Sealed for PtrPool<T> where T: Send {}
502impl<T> PoolRef<T> for PtrPool<T> where T: Send {}
503impl<T> Deref for PtrPool<T>
504where
505    T: Send,
506{
507    type Target = OnceEventPool<T>;
508
509    fn deref(&self) -> &Self::Target {
510        // SAFETY: The owner of the sender/receiver pair is responsible for ensuring the pool
511        // outlives both, satisfying the lifetime requirement for as_ref().
512        unsafe { self.pool.as_ref() }
513    }
514}
515impl<T> Clone for PtrPool<T>
516where
517    T: Send,
518{
519    fn clone(&self) -> Self {
520        Self { pool: self.pool }
521    }
522}
523impl<T: Send> ReflectiveTSend for PtrPool<T> {
524    type T = T;
525}
526// SAFETY: This is only used with the thread-safe pool (the pool is Sync).
527unsafe impl<T> Send for PtrPool<T> where T: Send {}
528
529/// A receiver that can receive a single value through a thread-safe event.
530///
531/// The type of the value is the inner type parameter,
532/// i.e. the `T` in `PooledOnceReceiver<ArcPool<T>>`.
533///
534/// The outer type parameter determines the mechanism by which the endpoint is bound to the event
535/// pool. Different binding mechanisms offer different performance characteristics and resource
536/// management patterns.
537pub struct PooledOnceSender<P>
538where
539    P: PoolRef<<P as ReflectiveTSend>::T>,
540{
541    pool_ref: P,
542    event: RawPooled<OnceEvent<P::T>>,
543}
544
545impl<P> Debug for PooledOnceSender<P>
546where
547    P: PoolRef<<P as ReflectiveTSend>::T>,
548{
549    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
550        f.debug_struct("PooledOnceSender")
551            .field("item_type", &format_args!("{}", any::type_name::<P::T>()))
552            .field("pool_ref", &self.pool_ref)
553            .field("event", &self.event)
554            .finish()
555    }
556}
557
558impl<P> PooledOnceSender<P>
559where
560    P: PoolRef<<P as ReflectiveTSend>::T>,
561{
562    /// Sends a value through the event.
563    ///
564    /// This method consumes the sender and always succeeds, regardless of whether
565    /// there is a receiver waiting.
566    ///
567    /// # Example
568    ///
569    /// ```rust
570    /// use events::OnceEventPool;
571    ///
572    /// let pool = OnceEventPool::new();
573    /// let (sender, receiver) = pool.bind_by_ref();
574    ///
575    /// sender.send(42);
576    /// let value = futures::executor::block_on(receiver).unwrap();
577    /// assert_eq!(value, 42);
578    /// ```
579    #[inline]
580    pub fn send(self, value: P::T) {
581        // The drop logic is different before/after set(), so we switch to manual drop here.
582        let mut this = ManuallyDrop::new(self);
583
584        // SAFETY: The pool remains alive through the pool_ref, satisfying the lifetime requirement
585        // for creating a reference. No exclusive references can exist because the events package
586        // only uses RawPooled<T> (shared handles), never RawPooledMut<T> (exclusive handles).
587        let event = unsafe { this.event.as_ref() };
588
589        let set_result = event.set(value);
590
591        if set_result == Err(Disconnected) {
592            // SAFETY: The handle belongs to this pool (created via insert_with()) and the event
593            // state machine ensures this is the only removal path for this event instance.
594            unsafe {
595                this.pool_ref
596                    .pool
597                    .lock()
598                    .expect(ERR_POISONED_LOCK)
599                    .remove(this.event);
600            }
601        }
602
603        // We also still need to drop the pool ref itself!
604        // SAFETY: This is a valid object and ManuallyDrop ensures it will not be auto-dropped.
605        unsafe {
606            ptr::drop_in_place(&raw mut this.pool_ref);
607        }
608    }
609}
610
611impl<P> Drop for PooledOnceSender<P>
612where
613    P: PoolRef<<P as ReflectiveTSend>::T>,
614{
615    #[inline]
616    fn drop(&mut self) {
617        // SAFETY: The pool remains alive through the pool_ref, satisfying the lifetime requirement
618        // for creating a reference. No exclusive references can exist because the events package
619        // only uses RawPooled<T> (shared handles), never RawPooledMut<T> (exclusive handles).
620        let event = unsafe { self.event.as_ref() };
621
622        // This ensures receivers get Disconnected errors if the sender is dropped without sending.
623        if event.sender_dropped_without_set() == Err(Disconnected) {
624            // SAFETY: The handle belongs to this pool (created via insert_with()) and the event
625            // state machine ensures this is the only removal path for this event instance.
626            unsafe {
627                self.pool_ref
628                    .pool
629                    .lock()
630                    .expect(ERR_POISONED_LOCK)
631                    .remove(self.event);
632            }
633        }
634    }
635}
636
637// SAFETY: The NonNull marks it !Send by default but we know that everything behind the pointer
638// is thread-safe, so all is well. We also require `Send` from `R` to be extra safe here.
639unsafe impl<P> Send for PooledOnceSender<P> where P: PoolRef<<P as ReflectiveTSend>::T> + Send {}
640
641/// A receiver that can receive a single value through a thread-safe event.
642///
643/// The type of the value is the inner type parameter,
644/// i.e. the `T` in `PooledOnceReceiver<ArcPool<T>>`.
645///
646/// The outer type parameter determines the mechanism by which the endpoint is bound to the event
647/// pool. Different binding mechanisms offer different performance characteristics and resource
648/// management patterns.
649pub struct PooledOnceReceiver<P>
650where
651    P: PoolRef<<P as ReflectiveTSend>::T>,
652{
653    pool_ref: P,
654    event: Option<RawPooled<OnceEvent<P::T>>>,
655}
656
657impl<P> Debug for PooledOnceReceiver<P>
658where
659    P: PoolRef<<P as ReflectiveTSend>::T>,
660{
661    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
662        f.debug_struct("PooledOnceReceiver")
663            .field("item_type", &format_args!("{}", any::type_name::<P::T>()))
664            .field("pool_ref", &self.pool_ref)
665            .field("event", &self.event)
666            .finish()
667    }
668}
669
670impl<P> PooledOnceReceiver<P>
671where
672    P: PoolRef<<P as ReflectiveTSend>::T>,
673{
674    /// Checks whether a value is ready to be received.
675    ///
676    /// # Panics
677    ///
678    /// Panics if called after `poll()` has returned `Ready`.
679    pub fn is_ready(&self) -> bool {
680        let Some(event_handle) = &self.event else {
681            // Already pseudo-consumed the receiver as part of the Future impl.
682            panic!("receiver queried after completion");
683        };
684
685        // SAFETY: The pool remains alive through the pool_ref, satisfying the lifetime requirement
686        // for creating a reference. No exclusive references can exist because the events package
687        // only uses RawPooled<T> (shared handles), never RawPooledMut<T> (exclusive handles).
688        let event = unsafe { event_handle.as_ref() };
689
690        event.is_set()
691    }
692
693    /// Consumes the receiver and transforms it into the received value, if the value is available.
694    ///
695    /// This method provides an alternative to awaiting the receiver when you want to check for
696    /// an immediately available value without blocking. It returns `Some(value)` if a value has
697    /// already been sent, or `None` if no value is currently available.
698    ///
699    /// # Panics
700    ///
701    /// Panics if the value has already been received via `Future::poll()`.
702    ///
703    /// # Examples
704    ///
705    /// ## Basic usage with Arc-based pool
706    ///
707    /// ```rust
708    /// use std::sync::Arc;
709    ///
710    /// use events::OnceEventPool;
711    ///
712    /// let pool = Arc::new(OnceEventPool::<String>::new());
713    /// let (sender, receiver) = pool.bind_by_arc();
714    /// sender.send("Hello from pool".to_string());
715    ///
716    /// // Value is immediately available
717    /// let value = receiver.into_value();
718    /// assert_eq!(value.unwrap(), Ok("Hello from pool".to_string()));
719    /// // Event is automatically returned to pool for reuse
720    /// ```
721    ///
722    /// ## No value available
723    ///
724    /// ```rust
725    /// use std::sync::Arc;
726    ///
727    /// use events::OnceEventPool;
728    ///
729    /// let pool = Arc::new(OnceEventPool::<i32>::new());
730    /// let (_sender, receiver) = pool.bind_by_arc();
731    ///
732    /// // No value sent yet - receiver is returned back
733    /// let result = receiver.into_value();
734    /// assert!(result.is_err()); // Returns Err(receiver)
735    /// // Event is still returned to pool
736    /// ```
737    ///
738    /// ## Using with reference-based pool binding
739    ///
740    /// ```rust
741    /// use events::OnceEventPool;
742    ///
743    /// let pool = OnceEventPool::<String>::new();
744    /// let (sender, receiver) = pool.bind_by_ref();
745    /// sender.send("Hello".to_string());
746    ///
747    /// let value = receiver.into_value().unwrap();
748    /// assert_eq!(value, Ok("Hello".to_string()));
749    /// ```
750    pub fn into_value(mut self) -> Result<Result<<P as ReflectiveTSend>::T, Disconnected>, Self> {
751        let Some(event_handle) = self.event.take() else {
752            // Already pseudo-consumed the receiver as part of the Future impl.
753            return Err(self);
754        };
755
756        // SAFETY: The pool remains alive through the pool_ref, satisfying the lifetime requirement
757        // for creating a reference. No exclusive references can exist because the events package
758        // only uses RawPooled<T> (shared handles), never RawPooledMut<T> (exclusive handles).
759        let event = unsafe { event_handle.as_ref() };
760
761        // First check if the event is set (non-destructive)
762        if !event.is_set() {
763            // No value available yet and sender hasn't disconnected - return the receiver
764            self.event = Some(event_handle);
765            return Err(self);
766        }
767
768        // Event is set - use final_poll to determine if we have a value or disconnection
769        match event.final_poll() {
770            Ok(Some(value)) => {
771                // We have a value - clean up and return it
772                // SAFETY: The handle belongs to this pool (created via insert_with()) and the event
773                // state machine ensures this is the only removal path for this event instance.
774                unsafe {
775                    self.pool_ref
776                        .pool
777                        .lock()
778                        .expect(ERR_POISONED_LOCK)
779                        .remove(event_handle);
780                }
781                Ok(Ok(value))
782            }
783            Ok(None) => {
784                // This shouldn't happen since is_set() returned true, but handle it gracefully
785                self.event = Some(event_handle);
786                Err(self)
787            }
788            Err(Disconnected) => {
789                // Sender disconnected - clean up and return Disconnected
790                // SAFETY: The handle belongs to this pool (created via insert_with()) and the event
791                // state machine ensures this is the only removal path for this event instance.
792                unsafe {
793                    self.pool_ref
794                        .pool
795                        .lock()
796                        .expect(ERR_POISONED_LOCK)
797                        .remove(event_handle);
798                }
799                Ok(Err(Disconnected))
800            }
801        }
802    }
803
804    /// Drops the inner state, releasing the event back to the pool, returning the value (if any).
805    ///
806    /// May also be used from contexts where the receiver itself is not yet consumed.
807    fn drop_inner(&mut self) -> Option<<P as ReflectiveTSend>::T> {
808        let Some(event_handle) = self.event.take() else {
809            // Already pseudo-consumed the receiver as part of the Future impl.
810            return None;
811        };
812
813        // SAFETY: The pool remains alive through the pool_ref, satisfying the lifetime requirement
814        // for creating a reference. No exclusive references can exist because the events package
815        // only uses RawPooled<T> (shared handles), never RawPooledMut<T> (exclusive handles).
816        let event = unsafe { event_handle.as_ref() };
817
818        let final_poll_result = event.final_poll();
819
820        match final_poll_result {
821            Ok(Some(value)) => {
822                // The sender has disconnected and sent a value, so we need to clean up.
823
824                // SAFETY: The handle belongs to this pool (created via insert_with()) and the event
825                // state machine ensures this is the only removal path for this event instance.
826                unsafe {
827                    self.pool_ref
828                        .pool
829                        .lock()
830                        .expect(ERR_POISONED_LOCK)
831                        .remove(event_handle);
832                }
833
834                Some(value)
835            }
836            Ok(None) => {
837                // Nothing for us to do - the sender was still connected and had not
838                // sent any value, so it will perform the cleanup on its own.
839                None
840            }
841            Err(Disconnected) => {
842                // The sender has already disconnected, so we need to clean up the event.
843
844                // SAFETY: The handle belongs to this pool (created via insert_with()) and the event
845                // state machine ensures this is the only removal path for this event instance.
846                unsafe {
847                    self.pool_ref
848                        .pool
849                        .lock()
850                        .expect(ERR_POISONED_LOCK)
851                        .remove(event_handle);
852                }
853
854                None
855            }
856        }
857    }
858}
859
860impl<P> Future for PooledOnceReceiver<P>
861where
862    P: PoolRef<<P as ReflectiveTSend>::T>,
863{
864    type Output = Result<P::T, Disconnected>;
865
866    #[inline]
867    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
868        // SAFETY: We are not moving anything, just touching internal state.
869        let this = unsafe { self.get_unchecked_mut() };
870
871        let event_handle = this
872            .event
873            .expect("polling a Future after completion is invalid");
874
875        // SAFETY: The pool remains alive through the pool_ref, satisfying the lifetime requirement
876        // for creating a reference. The handle is valid as it was created from this same pool.
877        let event = unsafe { event_handle.as_ref() };
878
879        let poll_result = event.poll(cx.waker());
880
881        poll_result.map_or_else(
882            || task::Poll::Pending,
883            |value| {
884                // Any result from the inner poll means we were the last endpoint connected,
885                // so we have to clean up the event now.
886
887                // SAFETY: The handle belongs to this pool (created via insert_with()) and the event
888                // state machine ensures this is the only removal path for this event instance.
889                unsafe {
890                    this.pool_ref
891                        .pool
892                        .lock()
893                        .expect(ERR_POISONED_LOCK)
894                        .remove(event_handle);
895                }
896
897                // The cleanup is already all done by poll() when it returns a result.
898                // This just ensures panic on double poll (otherwise we would violate memory safety).
899                this.event = None;
900
901                task::Poll::Ready(value)
902            },
903        )
904    }
905}
906
907impl<P> Drop for PooledOnceReceiver<P>
908where
909    P: PoolRef<<P as ReflectiveTSend>::T>,
910{
911    #[inline]
912    fn drop(&mut self) {
913        self.drop_inner();
914    }
915}
916
917// SAFETY: The NonNull marks it !Send by default but we know that everything behind the pointer
918// is thread-safe, so all is well. We also require `Send` from `R` to be extra safe here.
919unsafe impl<P> Send for PooledOnceReceiver<P> where P: PoolRef<<P as ReflectiveTSend>::T> + Send {}
920
921#[cfg(test)]
922mod tests {
923    use std::pin::pin;
924
925    use futures::task::noop_waker_ref;
926    use static_assertions::{assert_impl_all, assert_not_impl_any};
927    use testing::with_watchdog;
928
929    use super::*;
930
931    #[test]
932    fn event_pool_by_ref() {
933        with_watchdog(|| {
934            let pool = OnceEventPool::<i32>::new();
935
936            // Pool starts empty
937            assert_eq!(pool.len(), 0);
938            assert!(pool.is_empty());
939
940            let (sender, receiver) = pool.bind_by_ref();
941
942            // Pool should have 1 event while endpoints are bound
943            assert_eq!(pool.len(), 1);
944            assert!(!pool.is_empty());
945
946            // Receiver should not be ready before value is sent
947            assert!(!receiver.is_ready());
948
949            sender.send(42);
950
951            // Receiver should be ready after value is sent
952            assert!(receiver.is_ready());
953
954            let value = futures::executor::block_on(receiver).unwrap();
955            assert_eq!(value, 42);
956
957            // After endpoints are dropped, pool should be empty
958            assert_eq!(pool.len(), 0);
959            assert!(pool.is_empty());
960        });
961    }
962
963    #[test]
964    fn pool_drop_cleanup() {
965        with_watchdog(|| {
966            let pool = OnceEventPool::<i32>::new();
967
968            // Create and drop sender/receiver without using them
969            let (sender, receiver) = pool.bind_by_ref();
970            drop(sender);
971            drop(receiver);
972
973            // Pool should be empty (the event should have been cleaned up)
974            // This is implementation detail but shows the cleanup works
975        });
976    }
977
978    #[test]
979    fn pool_multiple_events() {
980        with_watchdog(|| {
981            let pool = OnceEventPool::<i32>::new();
982
983            // Test one event first
984            let (sender1, receiver1) = pool.bind_by_ref();
985            sender1.send(1);
986            let value1 = futures::executor::block_on(receiver1).unwrap();
987            assert_eq!(value1, 1);
988
989            // Test another event
990            let (sender2, receiver2) = pool.bind_by_ref();
991            sender2.send(2);
992            let value2 = futures::executor::block_on(receiver2).unwrap();
993            assert_eq!(value2, 2);
994        });
995    }
996
997    #[test]
998    fn event_pool_by_arc() {
999        with_watchdog(|| {
1000            let pool = Arc::new(OnceEventPool::<i32>::new());
1001
1002            // Pool starts empty
1003            assert_eq!(pool.len(), 0);
1004            assert!(pool.is_empty());
1005
1006            let (sender, receiver) = pool.bind_by_arc();
1007
1008            // Pool should have 1 event while endpoints are bound
1009            assert_eq!(pool.len(), 1);
1010            assert!(!pool.is_empty());
1011
1012            sender.send(42);
1013            let value = futures::executor::block_on(receiver).unwrap();
1014            assert_eq!(value, 42);
1015
1016            // After endpoints are dropped, pool should be empty
1017            assert_eq!(pool.len(), 0);
1018            assert!(pool.is_empty());
1019        });
1020    }
1021
1022    #[test]
1023    fn event_pool_by_ptr() {
1024        with_watchdog(|| {
1025            let pool = Box::pin(OnceEventPool::<i32>::new());
1026
1027            // Pool starts empty
1028            assert_eq!(pool.len(), 0);
1029            assert!(pool.is_empty());
1030
1031            // SAFETY: We ensure the pool is pinned and outlives the sender and receiver
1032            let (sender, receiver) = unsafe { pool.as_ref().bind_by_ptr() };
1033
1034            // Pool should have 1 event while endpoints are bound
1035            assert_eq!(pool.len(), 1);
1036            assert!(!pool.is_empty());
1037
1038            sender.send(42);
1039            let value = futures::executor::block_on(receiver).unwrap();
1040            assert_eq!(value, 42);
1041
1042            // After endpoints are dropped, pool should be empty
1043            assert_eq!(pool.len(), 0);
1044            assert!(pool.is_empty());
1045        });
1046    }
1047
1048    // Memory leak detection tests - these specifically test that cleanup occurs on drop
1049    #[test]
1050    fn by_ref_sender_drop_cleanup() {
1051        with_watchdog(|| {
1052            let pool = OnceEventPool::<i32>::new();
1053            {
1054                let (sender, _receiver) = pool.bind_by_ref();
1055
1056                // Force the sender to be dropped without being consumed by send()
1057                drop(sender);
1058                // Receiver will be dropped at end of scope
1059            }
1060
1061            // Create a new event to verify the pool is still functional
1062            let (sender2, receiver2) = pool.bind_by_ref();
1063            sender2.send(123);
1064            let value = futures::executor::block_on(receiver2).unwrap();
1065            assert_eq!(value, 123);
1066        });
1067    }
1068
1069    #[test]
1070    fn by_ref_receiver_drop_cleanup() {
1071        with_watchdog(|| {
1072            let pool = OnceEventPool::<i32>::new();
1073            {
1074                let (_sender, receiver) = pool.bind_by_ref();
1075
1076                // Force the receiver to be dropped without being consumed by recv()
1077                drop(receiver);
1078                // Sender will be dropped at end of scope
1079            }
1080
1081            // Create a new event to verify the pool is still functional
1082            let (sender2, receiver2) = pool.bind_by_ref();
1083            sender2.send(456);
1084            let value = futures::executor::block_on(receiver2).unwrap();
1085            assert_eq!(value, 456);
1086        });
1087    }
1088
1089    #[test]
1090    fn by_arc_sender_drop_cleanup() {
1091        with_watchdog(|| {
1092            let pool = Arc::new(OnceEventPool::<i32>::new());
1093            let (sender, _receiver) = pool.bind_by_arc();
1094
1095            // Force the sender to be dropped without being consumed by send()
1096            drop(sender);
1097
1098            // Create a new event to verify the pool is still functional
1099            let (sender2, receiver2) = pool.bind_by_arc();
1100            sender2.send(654);
1101            let value = futures::executor::block_on(receiver2).unwrap();
1102            assert_eq!(value, 654);
1103        });
1104    }
1105
1106    #[test]
1107    fn by_arc_receiver_drop_cleanup() {
1108        with_watchdog(|| {
1109            let pool = Arc::new(OnceEventPool::<i32>::new());
1110            let (_sender, receiver) = pool.bind_by_arc();
1111
1112            // Force the receiver to be dropped without being consumed by recv()
1113            drop(receiver);
1114
1115            // Create a new event to verify the pool is still functional
1116            let (sender2, receiver2) = pool.bind_by_arc();
1117            sender2.send(987);
1118            let value = futures::executor::block_on(receiver2).unwrap();
1119            assert_eq!(value, 987);
1120        });
1121    }
1122
1123    #[test]
1124    fn by_ptr_sender_drop_cleanup() {
1125        with_watchdog(|| {
1126            let pool = Box::pin(OnceEventPool::<i32>::new());
1127
1128            // SAFETY: We ensure the pool is pinned and outlives the sender and receiver
1129            let (sender, _receiver) = unsafe { pool.as_ref().bind_by_ptr() };
1130
1131            // Force the sender to be dropped without being consumed by send()
1132            drop(sender);
1133
1134            // Create a new event to verify the pool is still functional
1135            // SAFETY: We ensure the pool is pinned and outlives the sender and receiver
1136            let (sender2, receiver2) = unsafe { pool.as_ref().bind_by_ptr() };
1137            sender2.send(147);
1138            let value = futures::executor::block_on(receiver2).unwrap();
1139            assert_eq!(value, 147);
1140        });
1141    }
1142
1143    #[test]
1144    fn by_ptr_receiver_drop_cleanup() {
1145        with_watchdog(|| {
1146            let pool = Box::pin(OnceEventPool::<i32>::new());
1147
1148            // SAFETY: We ensure the pool is pinned and outlives the sender and receiver
1149            let (_sender, receiver) = unsafe { pool.as_ref().bind_by_ptr() };
1150
1151            // Force the receiver to be dropped without being consumed by recv()
1152            drop(receiver);
1153
1154            // Create a new event to verify the pool is still functional
1155            // SAFETY: We ensure the pool is pinned and outlives the sender and receiver
1156            let (sender2, receiver2) = unsafe { pool.as_ref().bind_by_ptr() };
1157            sender2.send(258);
1158            let value = futures::executor::block_on(receiver2).unwrap();
1159            assert_eq!(value, 258);
1160        });
1161    }
1162
1163    #[test]
1164    fn dec_ref_and_cleanup_is_called() {
1165        with_watchdog(|| {
1166            let pool = OnceEventPool::<i32>::new();
1167
1168            // Create multiple events and drop them without using
1169            for _ in 0..5 {
1170                let (sender, receiver) = pool.bind_by_ref();
1171                drop(sender);
1172                drop(receiver);
1173            }
1174
1175            // Verify pool still works correctly after cleanup
1176            let (sender, receiver) = pool.bind_by_ref();
1177            sender.send(999);
1178            let value = futures::executor::block_on(receiver).unwrap();
1179            assert_eq!(value, 999);
1180        });
1181    }
1182
1183    #[test]
1184    fn pool_cleanup_verified_by_capacity() {
1185        with_watchdog(|| {
1186            let pool = OnceEventPool::<i32>::new();
1187
1188            // Create many events and drop them without using - this should not grow the pool permanently
1189            for i in 0..10 {
1190                let (sender, receiver) = pool.bind_by_ref();
1191                if i % 2 == 0 {
1192                    drop(sender);
1193                    drop(receiver);
1194                } else {
1195                    // Use some events normally
1196                    sender.send(i);
1197                    let _value = futures::executor::block_on(receiver);
1198                }
1199            }
1200
1201            // The pool should have cleaned up unused events
1202            // If cleanup is broken, the pool would retain all the unused events
1203            // This is a bit of an implementation detail but it's necessary to catch the leak
1204
1205            // Create one more event to verify pool still works
1206            let (sender, receiver) = pool.bind_by_ref();
1207            sender.send(42);
1208            let value = futures::executor::block_on(receiver).unwrap();
1209            assert_eq!(value, 42);
1210        });
1211    }
1212
1213    #[test]
1214    fn pool_stress_test_no_leak() {
1215        with_watchdog(|| {
1216            let pool = OnceEventPool::<u64>::new();
1217
1218            // Stress test with many dropped events
1219            for _ in 0..100 {
1220                let (sender, receiver) = pool.bind_by_ref();
1221                // Drop without using
1222                drop(sender);
1223                drop(receiver);
1224            }
1225
1226            // Pool should still work efficiently
1227            let (sender, receiver) = pool.bind_by_ref();
1228            sender.send(999);
1229            let value = futures::executor::block_on(receiver).unwrap();
1230            assert_eq!(value, 999);
1231        });
1232    }
1233
1234    #[test]
1235    fn by_ref_drop_actually_cleans_up_pool() {
1236        let pool = OnceEventPool::<u32>::new();
1237
1238        // Create many events but drop them without use
1239        for _ in 0..100 {
1240            let (_sender, _receiver) = pool.bind_by_ref();
1241            // Both sender and receiver will be dropped here
1242        }
1243
1244        // Pool should be cleaned up - all events should be removed
1245        // If Drop implementations don't work, pool will retain unused events
1246        let mut pool_guard = pool.pool.lock().unwrap();
1247        assert_eq!(
1248            pool_guard.len(),
1249            0,
1250            "Pool still contains unused events - Drop implementations not working"
1251        );
1252
1253        // An empty pool should be able to shrink to capacity 0
1254        pool_guard.shrink_to_fit();
1255        assert_eq!(
1256            pool_guard.capacity(),
1257            0,
1258            "Empty pool should shrink to capacity 0"
1259        );
1260    }
1261
1262    #[test]
1263    fn by_arc_drop_actually_cleans_up_pool() {
1264        let pool = Arc::new(OnceEventPool::<u32>::new());
1265
1266        // Create many events but drop them without use
1267        for _ in 0..100 {
1268            let (_sender, _receiver) = pool.bind_by_arc();
1269            // Both sender and receiver will be dropped here
1270        }
1271
1272        // Pool should be cleaned up - all events should be removed
1273        let mut pool_guard = pool.pool.lock().unwrap();
1274        assert_eq!(
1275            pool_guard.len(),
1276            0,
1277            "Pool still contains unused events - Drop implementations not working"
1278        );
1279
1280        // An empty pool should be able to shrink to capacity 0
1281        pool_guard.shrink_to_fit();
1282        assert_eq!(
1283            pool_guard.capacity(),
1284            0,
1285            "Empty pool should shrink to capacity 0"
1286        );
1287    }
1288
1289    #[test]
1290    fn by_ptr_drop_actually_cleans_up_pool() {
1291        // Test ptr-based pooled events cleanup by checking pool state
1292        for iteration in 0..10 {
1293            {
1294                let pool = Box::pin(OnceEventPool::<u32>::new());
1295                // SAFETY: We pin the pool for the duration of by_ptr call
1296                let (_sender, _receiver) = unsafe { pool.as_ref().bind_by_ptr() };
1297                // sender and receiver will be dropped here
1298            }
1299
1300            // For this test, we'll verify that repeated operations don't accumulate
1301            // If Drop implementations don't work, we'd see memory accumulation
1302            println!("Iteration {iteration}: Pool operations completed");
1303        }
1304    }
1305
1306    #[test]
1307    fn dec_ref_and_cleanup_actually_removes_events() {
1308        let pool = OnceEventPool::<u32>::new();
1309
1310        // Test 1: Check that events are added to pool
1311        let pool_len_before = {
1312            let pool_guard = pool.pool.lock().unwrap();
1313            pool_guard.len()
1314        };
1315
1316        // Create events in a scope to ensure they're dropped
1317        {
1318            let (sender, receiver) = pool.bind_by_ref();
1319
1320            // Events should be in pool now (don't check len while borrowed)
1321
1322            drop(sender);
1323            drop(receiver);
1324        }
1325
1326        // Now check that cleanup worked
1327        let pool_len_after = {
1328            let pool_guard = pool.pool.lock().unwrap();
1329            pool_guard.len()
1330        };
1331        assert_eq!(
1332            pool_len_after, pool_len_before,
1333            "Pool not cleaned up after dropping events - dec_ref_and_cleanup not working"
1334        );
1335    }
1336
1337    #[test]
1338    fn shrink_to_fit_with_empty_pool_shrinks_to_zero() {
1339        let pool = OnceEventPool::<u32>::new();
1340
1341        // Create and drop events without using them
1342        for _ in 0..10 {
1343            drop(pool.bind_by_ref());
1344        }
1345
1346        assert_eq!(pool.len(), 0);
1347        assert!(pool.is_empty());
1348
1349        // Shrink the pool to fit
1350        pool.shrink_to_fit();
1351
1352        assert_eq!(
1353            pool.pool.lock().unwrap().capacity(),
1354            0,
1355            "Empty pool should shrink to capacity 0"
1356        );
1357    }
1358
1359    #[test]
1360    fn event_removed_from_pool_after_endpoints_immediate_drop() {
1361        let pool = OnceEventPool::<u32>::new();
1362
1363        drop(pool.bind_by_ref());
1364
1365        assert_eq!(pool.len(), 0);
1366        assert!(pool.is_empty());
1367    }
1368
1369    #[test]
1370    fn pool_len_and_is_empty_methods() {
1371        let pool = OnceEventPool::<u32>::new();
1372
1373        // Initially empty
1374        assert_eq!(pool.len(), 0);
1375        assert!(pool.is_empty());
1376
1377        // Create first event
1378        let (sender1, receiver1) = pool.bind_by_ref();
1379        assert_eq!(pool.len(), 1);
1380        assert!(!pool.is_empty());
1381
1382        // Create second event while first is still bound
1383        let (sender2, receiver2) = pool.bind_by_ref();
1384        assert_eq!(pool.len(), 2);
1385        assert!(!pool.is_empty());
1386
1387        // Drop first event endpoints
1388        drop(sender1);
1389        drop(receiver1);
1390        assert_eq!(pool.len(), 1);
1391        assert!(!pool.is_empty());
1392
1393        // Drop second event endpoints
1394        drop(sender2);
1395        drop(receiver2);
1396        assert_eq!(pool.len(), 0);
1397        assert!(pool.is_empty());
1398    }
1399
1400    #[test]
1401    fn pooled_event_receiver_gets_disconnected_when_sender_dropped() {
1402        with_watchdog(|| {
1403            futures::executor::block_on(async {
1404                let pool = OnceEventPool::<i32>::new();
1405                let (sender, receiver) = pool.bind_by_ref();
1406
1407                // Drop the sender without sending anything
1408                drop(sender);
1409
1410                // Receiver should get a Disconnected error
1411                let result = receiver.await;
1412                assert!(matches!(result, Err(Disconnected)));
1413                assert!(matches!(result, Err(Disconnected)));
1414            });
1415        });
1416    }
1417
1418    #[test]
1419    fn pooled_event_by_arc_receiver_gets_disconnected_when_sender_dropped() {
1420        with_watchdog(|| {
1421            futures::executor::block_on(async {
1422                let pool = Arc::new(OnceEventPool::<i32>::new());
1423                let (sender, receiver) = pool.bind_by_arc();
1424
1425                // Drop the sender without sending anything
1426                drop(sender);
1427
1428                // Receiver should get a Disconnected error
1429                let result = receiver.await;
1430                assert!(matches!(result, Err(Disconnected)));
1431                assert!(matches!(result, Err(Disconnected)));
1432            });
1433        });
1434    }
1435
1436    #[test]
1437    fn pooled_event_by_ptr_receiver_gets_disconnected_when_sender_dropped() {
1438        with_watchdog(|| {
1439            futures::executor::block_on(async {
1440                let pool = Box::pin(OnceEventPool::<i32>::new());
1441
1442                // SAFETY: We ensure the pool is pinned and outlives the sender and receiver
1443                let (sender, receiver) = unsafe { pool.as_ref().bind_by_ptr() };
1444
1445                // Drop the sender without sending anything
1446                drop(sender);
1447
1448                // Receiver should get a Disconnected error
1449                let result = receiver.await;
1450                assert!(result.is_err());
1451                assert!(matches!(result, Err(Disconnected)));
1452            });
1453        });
1454    }
1455
1456    #[test]
1457    fn pooled_sender_dropped_when_awaiting_signals_disconnected() {
1458        let pool = OnceEventPool::<i32>::new();
1459        let (sender, receiver) = pool.bind_by_ref();
1460
1461        let mut receiver = pin!(receiver);
1462        let mut context = task::Context::from_waker(noop_waker_ref());
1463        assert!(matches!(
1464            receiver.as_mut().poll(&mut context),
1465            task::Poll::Pending
1466        ));
1467
1468        drop(sender);
1469
1470        let mut context = task::Context::from_waker(noop_waker_ref());
1471        assert!(matches!(
1472            receiver.as_mut().poll(&mut context),
1473            task::Poll::Ready(Err(Disconnected))
1474        ));
1475    }
1476
1477    #[cfg(debug_assertions)]
1478    #[test]
1479    fn inspect_awaiters_empty_pool() {
1480        let pool = OnceEventPool::<i32>::new();
1481
1482        let mut count = 0;
1483        pool.inspect_awaiters(|_| {
1484            count += 1;
1485        });
1486
1487        assert_eq!(count, 0);
1488    }
1489
1490    #[cfg(debug_assertions)]
1491    #[test]
1492    fn inspect_awaiters_no_awaiters() {
1493        let pool = OnceEventPool::<String>::new();
1494
1495        // Create some events but don't await them. They must still be inspected.
1496        let (_sender1, _receiver1) = pool.bind_by_ref();
1497        let (_sender2, _receiver2) = pool.bind_by_ref();
1498
1499        let mut count = 0;
1500        pool.inspect_awaiters(|_| {
1501            count += 1;
1502        });
1503
1504        assert_eq!(count, 2);
1505    }
1506
1507    #[cfg(debug_assertions)]
1508    #[test]
1509    fn inspect_awaiters_with_awaiters() {
1510        let pool = OnceEventPool::<i32>::new();
1511
1512        // Create events and start awaiting them
1513        let (_sender1, receiver1) = pool.bind_by_ref();
1514        let (_sender2, receiver2) = pool.bind_by_ref();
1515
1516        let mut context = task::Context::from_waker(noop_waker_ref());
1517        let mut pinned_receiver1 = pin!(receiver1);
1518        let mut pinned_receiver2 = pin!(receiver2);
1519
1520        // Poll both receivers to create awaiters
1521        let _poll1 = pinned_receiver1.as_mut().poll(&mut context);
1522        let _poll2 = pinned_receiver2.as_mut().poll(&mut context);
1523
1524        let mut count = 0;
1525        pool.inspect_awaiters(|_backtrace| {
1526            count += 1;
1527        });
1528
1529        assert_eq!(count, 2);
1530    }
1531
1532    #[test]
1533    fn pooled_receiver_into_value_with_sent_value() {
1534        with_watchdog(|| {
1535            let pool = Arc::new(OnceEventPool::<String>::new());
1536            let (sender, receiver) = pool.bind_by_arc();
1537            sender.send("test value".to_string());
1538
1539            let result = receiver.into_value();
1540            assert_eq!(result.unwrap(), Ok("test value".to_string()));
1541        });
1542    }
1543
1544    #[test]
1545    fn pooled_receiver_into_value_no_value_sent() {
1546        with_watchdog(|| {
1547            let pool = Arc::new(OnceEventPool::<i32>::new());
1548            let (_sender, receiver) = pool.bind_by_arc();
1549
1550            let result = receiver.into_value();
1551            match result {
1552                Err(_) => {} // Expected - receiver returned
1553                _ => panic!("Expected NotReady error when sender not ready"),
1554            }
1555        });
1556    }
1557
1558    #[test]
1559    fn pooled_receiver_into_value_sender_disconnected() {
1560        with_watchdog(|| {
1561            let pool = Arc::new(OnceEventPool::<String>::new());
1562            let (sender, receiver) = pool.bind_by_arc();
1563            drop(sender); // Disconnect without sending
1564
1565            let result = receiver.into_value();
1566            match result {
1567                Ok(Err(Disconnected)) => {} // Expected - disconnected
1568                _ => panic!("Expected Ok(Err(Disconnected)) when sender disconnected"),
1569            }
1570        });
1571    }
1572
1573    #[test]
1574    fn pooled_receiver_into_value_with_ref_pool() {
1575        with_watchdog(|| {
1576            let pool = OnceEventPool::<i32>::new();
1577            let (sender, receiver) = pool.bind_by_ref();
1578            sender.send(42);
1579
1580            let result = receiver.into_value();
1581            assert_eq!(result.unwrap(), Ok(42));
1582        });
1583    }
1584
1585    #[test]
1586    fn pooled_receiver_into_value_with_arc_pool() {
1587        with_watchdog(|| {
1588            let pool = Arc::new(OnceEventPool::<String>::new());
1589            let (sender, receiver) = pool.bind_by_arc();
1590            sender.send("arc test".to_string());
1591
1592            let result = receiver.into_value();
1593            assert_eq!(result.unwrap(), Ok("arc test".to_string()));
1594        });
1595    }
1596
1597    #[test]
1598    fn pooled_receiver_into_value_with_ptr_pool() {
1599        with_watchdog(|| {
1600            let pool = Box::pin(OnceEventPool::<i32>::new());
1601            // SAFETY: Pool is pinned and outlives the sender/receiver.
1602            let (sender, receiver) = unsafe { pool.as_ref().bind_by_ptr() };
1603            sender.send(999);
1604
1605            let result = receiver.into_value();
1606            assert_eq!(result.unwrap(), Ok(999));
1607        });
1608    }
1609
1610    #[test]
1611    fn pooled_receiver_into_value_returns_none_after_poll() {
1612        with_watchdog(|| {
1613            futures::executor::block_on(async {
1614                let pool = Arc::new(OnceEventPool::<i32>::new());
1615                let (sender, mut receiver) = pool.bind_by_arc();
1616                sender.send(42);
1617
1618                // Poll the receiver first
1619                let waker = noop_waker_ref();
1620                let mut context = task::Context::from_waker(waker);
1621                let poll_result = Pin::new(&mut receiver).poll(&mut context);
1622                assert_eq!(poll_result, task::Poll::Ready(Ok(42)));
1623
1624                // This should return None since the receiver was already consumed
1625                let value = receiver.into_value();
1626                match value {
1627                    Err(_) => {} // Expected - receiver returned after consumption
1628                    _ => panic!("Expected NotReady error after receiver consumption"),
1629                }
1630            });
1631        });
1632    }
1633
1634    #[test]
1635    fn pooled_receiver_into_value_pool_reuse() {
1636        with_watchdog(|| {
1637            let pool = Arc::new(OnceEventPool::<i32>::new());
1638
1639            // First usage
1640            let (sender1, receiver1) = pool.bind_by_arc();
1641            sender1.send(123);
1642            let result1 = receiver1.into_value();
1643            assert_eq!(result1.unwrap(), Ok(123));
1644
1645            // Second usage - should reuse the event from the pool
1646            let (sender2, receiver2) = pool.bind_by_arc();
1647            sender2.send(456);
1648            let result2 = receiver2.into_value();
1649            assert_eq!(result2.unwrap(), Ok(456));
1650        });
1651    }
1652
1653    #[test]
1654    fn pooled_receiver_into_value_multiple_event_types() {
1655        with_watchdog(|| {
1656            // Test with different value types
1657            let pool1 = Arc::new(OnceEventPool::<()>::new());
1658            let (sender1, receiver1) = pool1.bind_by_arc();
1659            sender1.send(());
1660            assert_eq!(receiver1.into_value().unwrap(), Ok(()));
1661
1662            let pool2 = Arc::new(OnceEventPool::<Vec<i32>>::new());
1663            let (sender2, receiver2) = pool2.bind_by_arc();
1664            sender2.send(vec![1, 2, 3]);
1665            assert_eq!(receiver2.into_value().unwrap(), Ok(vec![1, 2, 3]));
1666
1667            let pool3 = Arc::new(OnceEventPool::<Option<String>>::new());
1668            let (sender3, receiver3) = pool3.bind_by_arc();
1669            sender3.send(Some("nested option".to_string()));
1670            assert_eq!(
1671                receiver3.into_value().unwrap(),
1672                Ok(Some("nested option".to_string()))
1673            );
1674        });
1675    }
1676
1677    #[test]
1678    fn thread_safety() {
1679        // The pool is accessed across threads, so requires Sync as well as Send.
1680        assert_impl_all!(OnceEventPool<u32>: Send, Sync);
1681
1682        // These are all meant to be consumed locally - they may move between threads but are
1683        // not shared between threads, so Sync is not expected, only Send.
1684        assert_impl_all!(PooledOnceSender<RefPool<'static, u32>>: Send);
1685        assert_impl_all!(PooledOnceReceiver<RefPool<'static, u32>>: Send);
1686        assert_impl_all!(PooledOnceSender<ArcPool<u32>>: Send);
1687        assert_impl_all!(PooledOnceReceiver<ArcPool<u32>>: Send);
1688        assert_impl_all!(PooledOnceSender<PtrPool<u32>>: Send);
1689        assert_impl_all!(PooledOnceReceiver<PtrPool<u32>>: Send);
1690        assert_not_impl_any!(PooledOnceSender<RefPool<'static, u32>>: Sync);
1691        assert_not_impl_any!(PooledOnceReceiver<RefPool<'static, u32>>: Sync);
1692        assert_not_impl_any!(PooledOnceSender<ArcPool<u32>>: Sync);
1693        assert_not_impl_any!(PooledOnceReceiver<ArcPool<u32>>: Sync);
1694        assert_not_impl_any!(PooledOnceSender<PtrPool<u32>>: Sync);
1695        assert_not_impl_any!(PooledOnceReceiver<PtrPool<u32>>: Sync);
1696    }
1697}