events/once/pooled_sync.rs
1//! Pooled events that provide automatic resource management.
2//!
3//! This module provides pooled variants of events that automatically manage their lifecycle
4//! using reference counting. Events are created from pools and automatically returned to the
5//! pool when both sender and receiver are dropped.
6
7#[cfg(debug_assertions)]
8use std::backtrace::Backtrace;
9use std::future::Future;
10use std::marker::PhantomPinned;
11use std::ops::Deref;
12use std::pin::Pin;
13use std::ptr::NonNull;
14use std::sync::{Arc, Mutex};
15use std::task;
16
17use pinned_pool::{Key, PinnedPool};
18
19use crate::{Disconnected, ERR_POISONED_LOCK, OnceEvent, ReflectiveTSend, Sealed, WithTwoOwners};
20
21/// A pool that manages thread-safe events with automatic cleanup.
22///
23/// The pool creates events on demand and automatically cleans them up when both
24/// sender and receiver endpoints are dropped.
25///
26/// This pool provides zero-allocation event reuse for high-frequency eventing scenarios
27/// in a thread-safe manner.
28///
29/// # Example
30///
31/// ```rust
32/// use events::OnceEventPool;
33/// # use futures::executor::block_on;
34///
35/// # block_on(async {
36/// let pool = OnceEventPool::<i32>::new();
37///
38/// // First usage - creates new event
39/// let (sender1, receiver1) = pool.bind_by_ref();
40/// sender1.send(42);
41/// let value1 = receiver1.await.unwrap();
42/// assert_eq!(value1, 42);
43/// // Event returned to pool when sender1/receiver1 are dropped
44///
45/// // Second usage - reuses the same event instance (efficient!)
46/// let (sender2, receiver2) = pool.bind_by_ref();
47/// sender2.send(100);
48/// let value2 = receiver2.await.unwrap();
49/// assert_eq!(value2, 100);
50/// // Same event reused - no additional allocation overhead
51/// # });
52/// ```
53#[derive(Debug)]
54pub struct OnceEventPool<T>
55where
56 T: Send,
57{
58 pool: Mutex<PinnedPool<WithTwoOwners<OnceEvent<T>>>>,
59
60 // It is invalid to move this type once it has been pinned.
61 _requires_pinning: PhantomPinned,
62}
63
64impl<T> OnceEventPool<T>
65where
66 T: Send,
67{
68 /// Creates a new empty event pool.
69 ///
70 /// # Example
71 ///
72 /// ```rust
73 /// use events::OnceEventPool;
74 ///
75 /// let pool = OnceEventPool::<String>::new();
76 /// ```
77 #[must_use]
78 pub fn new() -> Self {
79 Self {
80 pool: Mutex::new(PinnedPool::new()),
81 _requires_pinning: PhantomPinned,
82 }
83 }
84
85 /// Creates sender and receiver endpoints connected by reference to the pool.
86 ///
87 /// The pool will create a new event and return endpoints that reference it.
88 /// When both endpoints are dropped, the event will be automatically cleaned up.
89 ///
90 /// # Example
91 ///
92 /// ```rust
93 /// use events::OnceEventPool;
94 /// # use futures::executor::block_on;
95 ///
96 /// # block_on(async {
97 /// let pool = OnceEventPool::<i32>::new();
98 ///
99 /// // First event usage
100 /// let (sender1, receiver1) = pool.bind_by_ref();
101 /// sender1.send(42);
102 /// let value1 = receiver1.await.unwrap();
103 /// assert_eq!(value1, 42);
104 ///
105 /// // Second event usage - efficiently reuses the same underlying event
106 /// let (sender2, receiver2) = pool.bind_by_ref();
107 /// sender2.send(100);
108 /// let value2 = receiver2.await.unwrap();
109 /// assert_eq!(value2, 100);
110 /// # });
111 /// ```
112 #[must_use]
113 pub fn bind_by_ref(
114 &self,
115 ) -> (
116 PooledOnceSender<RefPool<'_, T>>,
117 PooledOnceReceiver<RefPool<'_, T>>,
118 ) {
119 let mut inner_pool = self.pool.lock().expect(ERR_POISONED_LOCK);
120
121 let inserter = inner_pool.begin_insert();
122 let key = inserter.key();
123
124 let item = inserter.insert(WithTwoOwners::new(OnceEvent::new()));
125
126 let item_ptr = NonNull::from(item.get_ref());
127
128 let pool_ref = RefPool { pool: self };
129
130 (
131 PooledOnceSender {
132 event: Some(item_ptr),
133 pool_ref: pool_ref.clone(),
134 key,
135 },
136 PooledOnceReceiver {
137 event: Some(item_ptr),
138 pool_ref,
139 key,
140 },
141 )
142 }
143
144 /// Creates sender and receiver endpoints connected by Arc to the pool.
145 ///
146 /// The pool will create a new event and return endpoints that hold Arc references.
147 /// When both endpoints are dropped, the event will be automatically cleaned up.
148 ///
149 /// # Example
150 ///
151 /// ```rust
152 /// use std::sync::Arc;
153 ///
154 /// use events::OnceEventPool;
155 ///
156 /// let pool = Arc::new(OnceEventPool::<i32>::new());
157 ///
158 /// // First usage
159 /// let (sender1, receiver1) = pool.bind_by_arc();
160 /// sender1.send(42);
161 /// let value1 = futures::executor::block_on(receiver1).unwrap();
162 /// assert_eq!(value1, 42);
163 ///
164 /// // Second usage - efficiently reuses the same pooled event
165 /// let (sender2, receiver2) = pool.bind_by_arc();
166 /// sender2.send(200);
167 /// let value2 = futures::executor::block_on(receiver2).unwrap();
168 /// assert_eq!(value2, 200);
169 /// ```
170 #[must_use]
171 pub fn bind_by_arc(
172 self: &Arc<Self>,
173 ) -> (PooledOnceSender<ArcPool<T>>, PooledOnceReceiver<ArcPool<T>>) {
174 let mut inner_pool = self.pool.lock().expect(ERR_POISONED_LOCK);
175
176 let inserter = inner_pool.begin_insert();
177 let key = inserter.key();
178
179 let item = inserter.insert(WithTwoOwners::new(OnceEvent::new()));
180
181 let item_ptr = NonNull::from(item.get_ref());
182
183 let pool_ref = ArcPool {
184 pool: Arc::clone(self),
185 };
186
187 (
188 PooledOnceSender {
189 event: Some(item_ptr),
190 pool_ref: pool_ref.clone(),
191 key,
192 },
193 PooledOnceReceiver {
194 event: Some(item_ptr),
195 pool_ref,
196 key,
197 },
198 )
199 }
200
201 /// Creates sender and receiver endpoints connected by raw pointer to the pool.
202 ///
203 /// The pool will create a new event and return endpoints that hold raw pointers.
204 /// When both endpoints are dropped, the event will be automatically cleaned up.
205 ///
206 /// # Safety
207 ///
208 /// The caller must ensure that:
209 /// - The pool remains valid and pinned for the entire lifetime of the sender and receiver
210 /// - The sender and receiver are dropped before the pool is dropped
211 ///
212 /// # Example
213 ///
214 /// ```rust
215 /// use events::OnceEventPool;
216 ///
217 /// let pool = Box::pin(OnceEventPool::<i32>::new());
218 ///
219 /// // First usage
220 /// // SAFETY: We ensure the pool is pinned and outlives the sender and receiver
221 /// let (sender1, receiver1) = unsafe { pool.as_ref().bind_by_ptr() };
222 /// sender1.send(42);
223 /// let value1 = futures::executor::block_on(receiver1).unwrap();
224 /// assert_eq!(value1, 42);
225 ///
226 /// // Second usage - reuses the same event from the pool efficiently
227 /// // SAFETY: Pool is still valid and pinned
228 /// let (sender2, receiver2) = unsafe { pool.as_ref().bind_by_ptr() };
229 /// sender2.send(100);
230 /// let value2 = futures::executor::block_on(receiver2).unwrap();
231 /// assert_eq!(value2, 100);
232 /// // Both sender and receiver pairs are dropped here, before pool
233 /// ```
234 #[must_use]
235 pub unsafe fn bind_by_ptr(
236 self: Pin<&Self>,
237 ) -> (PooledOnceSender<PtrPool<T>>, PooledOnceReceiver<PtrPool<T>>) {
238 let mut inner_pool = self.pool.lock().expect(ERR_POISONED_LOCK);
239
240 let inserter = inner_pool.begin_insert();
241 let key = inserter.key();
242
243 let item = inserter.insert(WithTwoOwners::new(OnceEvent::new()));
244
245 let item_ptr = NonNull::from(item.get_ref());
246
247 let pool_ref = PtrPool {
248 pool: NonNull::from(self.get_ref()),
249 };
250
251 (
252 PooledOnceSender {
253 event: Some(item_ptr),
254 pool_ref: pool_ref.clone(),
255 key,
256 },
257 PooledOnceReceiver {
258 event: Some(item_ptr),
259 pool_ref,
260 key,
261 },
262 )
263 }
264
265 /// Returns the number of events currently in the pool.
266 ///
267 /// This represents the count of events that are currently allocated in the pool,
268 /// including those that are currently bound to sender/receiver endpoints.
269 /// Events are removed from the pool only when both endpoints are dropped.
270 ///
271 /// # Example
272 ///
273 /// ```rust
274 /// use events::OnceEventPool;
275 ///
276 /// let pool = OnceEventPool::<i32>::new();
277 /// assert_eq!(pool.len(), 0);
278 ///
279 /// let (sender, receiver) = pool.bind_by_ref();
280 /// assert_eq!(pool.len(), 1); // Event is in pool while endpoints exist
281 ///
282 /// drop(sender);
283 /// drop(receiver);
284 /// assert_eq!(pool.len(), 0); // Event cleaned up after both endpoints dropped
285 /// ```
286 #[must_use]
287 pub fn len(&self) -> usize {
288 self.pool.lock().expect(ERR_POISONED_LOCK).len()
289 }
290
291 /// Returns whether the pool is empty.
292 ///
293 /// This is equivalent to `pool.len() == 0` but may be more efficient.
294 /// An empty pool may still have reserved capacity.
295 ///
296 /// # Example
297 ///
298 /// ```rust
299 /// use events::OnceEventPool;
300 ///
301 /// let pool = OnceEventPool::<i32>::new();
302 /// assert!(pool.is_empty());
303 ///
304 /// let (sender, receiver) = pool.bind_by_ref();
305 /// assert!(!pool.is_empty()); // Pool has event while endpoints exist
306 ///
307 /// drop(sender);
308 /// drop(receiver);
309 /// assert!(pool.is_empty()); // Pool empty after both endpoints dropped
310 /// ```
311 #[must_use]
312 pub fn is_empty(&self) -> bool {
313 self.pool.lock().expect(ERR_POISONED_LOCK).is_empty()
314 }
315
316 /// Shrinks the capacity of the pool to reduce memory usage.
317 ///
318 /// This method attempts to release unused memory by reducing the pool's capacity.
319 /// The actual reduction is implementation-dependent and may vary - some capacity
320 /// may be released, or none at all.
321 ///
322 /// # Example
323 ///
324 /// ```rust
325 /// use events::OnceEventPool;
326 ///
327 /// let pool = OnceEventPool::<i32>::new();
328 ///
329 /// // Use the pool which may grow its capacity
330 /// for _ in 0..100 {
331 /// let (sender, receiver) = pool.bind_by_ref();
332 /// sender.send(42);
333 /// let _value = futures::executor::block_on(receiver);
334 /// }
335 ///
336 /// // Attempt to shrink to reduce memory usage
337 /// pool.shrink_to_fit();
338 /// ```
339 pub fn shrink_to_fit(&self) {
340 let mut inner_pool = self.pool.lock().expect("pool mutex should not be poisoned");
341 inner_pool.shrink_to_fit();
342 }
343
344 /// Uses the provided closure to inspect the backtraces of the current awaiter of each
345 /// event in the pool that is currently being awaited by someone.
346 ///
347 /// This method is only available in debug builds (`cfg(debug_assertions)`).
348 /// For any data to be present, `RUST_BACKTRACE=1` or `RUST_LIB_BACKTRACE=1` must be set.
349 ///
350 /// The closure is called once for each event in the pool that is currently being awaited by
351 /// someone.
352 #[cfg(debug_assertions)]
353 pub fn inspect_awaiters(&self, mut f: impl FnMut(&Backtrace)) {
354 let inner_pool = self.pool.lock().expect("pool mutex should not be poisoned");
355
356 for event in inner_pool.iter() {
357 event.inspect_awaiter(|bt| {
358 if let Some(bt) = bt {
359 f(bt);
360 }
361 });
362 }
363 }
364}
365
366impl<T> Default for OnceEventPool<T>
367where
368 T: Send,
369{
370 fn default() -> Self {
371 Self::new()
372 }
373}
374
375/// Enables a sender or receiver to reference the pool that stores the event that connects them.
376///
377/// This is a sealed trait and exists for internal use only. You never need to use it.
378#[expect(private_bounds, reason = "intentional - sealed trait")]
379pub trait PoolRef<T>: Deref<Target = OnceEventPool<T>> + ReflectiveTSend + Sealed
380where
381 T: Send,
382{
383}
384
385/// An event pool referenced via `&` shared reference.
386///
387/// Only used in type names. Instances are created internally by [`OnceEventPool`].
388#[derive(Copy, Debug)]
389pub struct RefPool<'a, T>
390where
391 T: Send,
392{
393 pool: &'a OnceEventPool<T>,
394}
395
396impl<T> Sealed for RefPool<'_, T> where T: Send {}
397impl<T> PoolRef<T> for RefPool<'_, T> where T: Send {}
398impl<T> Deref for RefPool<'_, T>
399where
400 T: Send,
401{
402 type Target = OnceEventPool<T>;
403
404 fn deref(&self) -> &Self::Target {
405 self.pool
406 }
407}
408impl<T> Clone for RefPool<'_, T>
409where
410 T: Send,
411{
412 fn clone(&self) -> Self {
413 Self { pool: self.pool }
414 }
415}
416impl<T: Send> ReflectiveTSend for RefPool<'_, T> {
417 type T = T;
418}
419
420/// An event pool referenced via `Arc` shared reference.
421///
422/// Only used in type names. Instances are created internally by [`OnceEventPool`].
423#[derive(Debug)]
424pub struct ArcPool<T>
425where
426 T: Send,
427{
428 pool: Arc<OnceEventPool<T>>,
429}
430
431impl<T> Sealed for ArcPool<T> where T: Send {}
432impl<T> PoolRef<T> for ArcPool<T> where T: Send {}
433impl<T> Deref for ArcPool<T>
434where
435 T: Send,
436{
437 type Target = OnceEventPool<T>;
438
439 fn deref(&self) -> &Self::Target {
440 &self.pool
441 }
442}
443impl<T> Clone for ArcPool<T>
444where
445 T: Send,
446{
447 fn clone(&self) -> Self {
448 Self {
449 pool: Arc::clone(&self.pool),
450 }
451 }
452}
453impl<T: Send> ReflectiveTSend for ArcPool<T> {
454 type T = T;
455}
456
457/// An event pool referenced via raw pointer.
458///
459/// Only used in type names. Instances are created internally by [`OnceEventPool`].
460#[derive(Copy, Debug)]
461pub struct PtrPool<T>
462where
463 T: Send,
464{
465 pool: NonNull<OnceEventPool<T>>,
466}
467
468impl<T> Sealed for PtrPool<T> where T: Send {}
469impl<T> PoolRef<T> for PtrPool<T> where T: Send {}
470impl<T> Deref for PtrPool<T>
471where
472 T: Send,
473{
474 type Target = OnceEventPool<T>;
475
476 fn deref(&self) -> &Self::Target {
477 // SAFETY: The creator of the reference is responsible for ensuring the pool outlives it.
478 unsafe { self.pool.as_ref() }
479 }
480}
481impl<T> Clone for PtrPool<T>
482where
483 T: Send,
484{
485 fn clone(&self) -> Self {
486 Self { pool: self.pool }
487 }
488}
489impl<T: Send> ReflectiveTSend for PtrPool<T> {
490 type T = T;
491}
492// SAFETY: This is only used with the thread-safe pool (the pool is Sync).
493unsafe impl<T> Send for PtrPool<T> where T: Send {}
494
495/// A receiver that can receive a single value through a thread-safe event.
496///
497/// The type of the value is the inner type parameter,
498/// i.e. the `T` in `PooledOnceReceiver<ArcPool<T>>`.
499///
500/// The outer type parameter determines the mechanism by which the endpoint is bound to the event
501/// pool. Different binding mechanisms offer different performance characteristics and resource
502/// management patterns.
503#[derive(Debug)]
504pub struct PooledOnceSender<P>
505where
506 P: PoolRef<<P as ReflectiveTSend>::T>,
507{
508 // This is a pointer to avoid contaminating the type signature with the event lifetime.
509 //
510 // SAFETY: We rely on the inner pool guaranteeing pinning and us owning a counted reference.
511 event: Option<NonNull<WithTwoOwners<OnceEvent<P::T>>>>,
512
513 pool_ref: P,
514 key: Key,
515}
516
517impl<P> PooledOnceSender<P>
518where
519 P: PoolRef<<P as ReflectiveTSend>::T>,
520{
521 /// Sends a value through the event.
522 ///
523 /// This method consumes the sender and always succeeds, regardless of whether
524 /// there is a receiver waiting.
525 ///
526 /// # Example
527 ///
528 /// ```rust
529 /// use events::OnceEventPool;
530 ///
531 /// let pool = OnceEventPool::new();
532 /// let (sender, receiver) = pool.bind_by_ref();
533 ///
534 /// sender.send(42);
535 /// let value = futures::executor::block_on(receiver).unwrap();
536 /// assert_eq!(value, 42);
537 /// ```
538 #[inline]
539 pub fn send(self, value: P::T) {
540 // SAFETY: See comments on field.
541 let event = unsafe {
542 self.event
543 .expect("event is only None during destruction")
544 .as_ref()
545 };
546
547 event.set(value);
548 }
549}
550
551impl<P> Drop for PooledOnceSender<P>
552where
553 P: PoolRef<<P as ReflectiveTSend>::T>,
554{
555 #[inline]
556 fn drop(&mut self) {
557 // SAFETY: See comments on field.
558 let event = unsafe { self.event.expect("only possible on double drop").as_ref() };
559
560 // The event is going to be destroyed, so we cannot reference it anymore.
561 self.event = None;
562
563 // Signal that the sender was dropped before handling reference counting.
564 // This ensures receivers get Disconnected errors if the sender is dropped without sending.
565 event.sender_dropped();
566
567 if event.release_one() {
568 self.pool_ref
569 .pool
570 .lock()
571 .expect(ERR_POISONED_LOCK)
572 .remove(self.key);
573 }
574 }
575}
576
577// SAFETY: The NonNull marks it !Send by default but we know that everything behind the pointer
578// is thread-safe, so all is well. We also require `Send` from `R` to be extra safe here.
579unsafe impl<P> Send for PooledOnceSender<P> where P: PoolRef<<P as ReflectiveTSend>::T> + Send {}
580
581/// A receiver that can receive a single value through a thread-safe event.
582///
583/// The type of the value is the inner type parameter,
584/// i.e. the `T` in `PooledOnceReceiver<ArcPool<T>>`.
585///
586/// The outer type parameter determines the mechanism by which the endpoint is bound to the event
587/// pool. Different binding mechanisms offer different performance characteristics and resource
588/// management patterns.
589#[derive(Debug)]
590pub struct PooledOnceReceiver<P>
591where
592 P: PoolRef<<P as ReflectiveTSend>::T>,
593{
594 // This is a pointer to avoid contaminating the type signature with the event lifetime.
595 //
596 // SAFETY: We rely on the inner pool guaranteeing pinning and us owning a counted reference.
597 event: Option<NonNull<WithTwoOwners<OnceEvent<P::T>>>>,
598
599 pool_ref: P,
600 key: Key,
601}
602
603impl<P> PooledOnceReceiver<P>
604where
605 P: PoolRef<<P as ReflectiveTSend>::T>,
606{
607 /// Drops the inner state, releasing the event back to the pool.
608 /// May also be used from contexts where the receiver itself is not yet consumed.
609 fn drop_inner(&mut self) {
610 let Some(event) = self.event else {
611 // Already pseudo-consumed the receiver as part of the Future impl.
612 return;
613 };
614
615 // Regardless of whether we were the last reference holder or not, we are no longer
616 // allowed to reference the event as we are releasing our reference.
617 self.event = None;
618
619 // SAFETY: See comments on field.
620 let event = unsafe { event.as_ref() };
621
622 if event.release_one() {
623 self.pool_ref
624 .pool
625 .lock()
626 .expect(ERR_POISONED_LOCK)
627 .remove(self.key);
628 }
629 }
630}
631
632impl<P> Future for PooledOnceReceiver<P>
633where
634 P: PoolRef<<P as ReflectiveTSend>::T>,
635{
636 type Output = Result<P::T, Disconnected>;
637
638 #[inline]
639 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
640 // SAFETY: We are not moving anything, just touching internal state.
641 let this = unsafe { self.get_unchecked_mut() };
642
643 // SAFETY: See comments on field.
644 let event = unsafe {
645 this.event
646 .expect("polling a Future after completion is invalid")
647 .as_ref()
648 };
649
650 let poll_result = event.poll(cx.waker());
651
652 poll_result.map_or_else(
653 || task::Poll::Pending,
654 |value| {
655 this.drop_inner();
656 task::Poll::Ready(value)
657 },
658 )
659 }
660}
661
662impl<P> Drop for PooledOnceReceiver<P>
663where
664 P: PoolRef<<P as ReflectiveTSend>::T>,
665{
666 #[inline]
667 fn drop(&mut self) {
668 self.drop_inner();
669 }
670}
671
672// SAFETY: The NonNull marks it !Send by default but we know that everything behind the pointer
673// is thread-safe, so all is well. We also require `Send` from `R` to be extra safe here.
674unsafe impl<P> Send for PooledOnceReceiver<P> where P: PoolRef<<P as ReflectiveTSend>::T> + Send {}
675
676#[cfg(test)]
677mod tests {
678 use std::pin::pin;
679
680 use futures::task::noop_waker_ref;
681 use static_assertions::{assert_impl_all, assert_not_impl_any};
682 use testing::with_watchdog;
683
684 use super::*;
685
686 #[test]
687 fn event_pool_by_ref() {
688 with_watchdog(|| {
689 let pool = OnceEventPool::<i32>::new();
690
691 // Pool starts empty
692 assert_eq!(pool.len(), 0);
693 assert!(pool.is_empty());
694
695 let (sender, receiver) = pool.bind_by_ref();
696
697 // Pool should have 1 event while endpoints are bound
698 assert_eq!(pool.len(), 1);
699 assert!(!pool.is_empty());
700
701 sender.send(42);
702 let value = futures::executor::block_on(receiver).unwrap();
703 assert_eq!(value, 42);
704
705 // After endpoints are dropped, pool should be empty
706 assert_eq!(pool.len(), 0);
707 assert!(pool.is_empty());
708 });
709 }
710
711 #[test]
712 fn pool_drop_cleanup() {
713 with_watchdog(|| {
714 let pool = OnceEventPool::<i32>::new();
715
716 // Create and drop sender/receiver without using them
717 let (sender, receiver) = pool.bind_by_ref();
718 drop(sender);
719 drop(receiver);
720
721 // Pool should be empty (the event should have been cleaned up)
722 // This is implementation detail but shows the cleanup works
723 });
724 }
725
726 #[test]
727 fn pool_multiple_events() {
728 with_watchdog(|| {
729 let pool = OnceEventPool::<i32>::new();
730
731 // Test one event first
732 let (sender1, receiver1) = pool.bind_by_ref();
733 sender1.send(1);
734 let value1 = futures::executor::block_on(receiver1).unwrap();
735 assert_eq!(value1, 1);
736
737 // Test another event
738 let (sender2, receiver2) = pool.bind_by_ref();
739 sender2.send(2);
740 let value2 = futures::executor::block_on(receiver2).unwrap();
741 assert_eq!(value2, 2);
742 });
743 }
744
745 #[test]
746 fn event_pool_by_arc() {
747 with_watchdog(|| {
748 let pool = Arc::new(OnceEventPool::<i32>::new());
749
750 // Pool starts empty
751 assert_eq!(pool.len(), 0);
752 assert!(pool.is_empty());
753
754 let (sender, receiver) = pool.bind_by_arc();
755
756 // Pool should have 1 event while endpoints are bound
757 assert_eq!(pool.len(), 1);
758 assert!(!pool.is_empty());
759
760 sender.send(42);
761 let value = futures::executor::block_on(receiver).unwrap();
762 assert_eq!(value, 42);
763
764 // After endpoints are dropped, pool should be empty
765 assert_eq!(pool.len(), 0);
766 assert!(pool.is_empty());
767 });
768 }
769
770 #[test]
771 fn event_pool_by_ptr() {
772 with_watchdog(|| {
773 let pool = Box::pin(OnceEventPool::<i32>::new());
774
775 // Pool starts empty
776 assert_eq!(pool.len(), 0);
777 assert!(pool.is_empty());
778
779 // SAFETY: We ensure the pool is pinned and outlives the sender and receiver
780 let (sender, receiver) = unsafe { pool.as_ref().bind_by_ptr() };
781
782 // Pool should have 1 event while endpoints are bound
783 assert_eq!(pool.len(), 1);
784 assert!(!pool.is_empty());
785
786 sender.send(42);
787 let value = futures::executor::block_on(receiver).unwrap();
788 assert_eq!(value, 42);
789
790 // After endpoints are dropped, pool should be empty
791 assert_eq!(pool.len(), 0);
792 assert!(pool.is_empty());
793 });
794 }
795
796 // Memory leak detection tests - these specifically test that cleanup occurs on drop
797 #[test]
798 fn by_ref_sender_drop_cleanup() {
799 with_watchdog(|| {
800 let pool = OnceEventPool::<i32>::new();
801 {
802 let (sender, _receiver) = pool.bind_by_ref();
803
804 // Force the sender to be dropped without being consumed by send()
805 drop(sender);
806 // Receiver will be dropped at end of scope
807 }
808
809 // Create a new event to verify the pool is still functional
810 let (sender2, receiver2) = pool.bind_by_ref();
811 sender2.send(123);
812 let value = futures::executor::block_on(receiver2).unwrap();
813 assert_eq!(value, 123);
814 });
815 }
816
817 #[test]
818 fn by_ref_receiver_drop_cleanup() {
819 with_watchdog(|| {
820 let pool = OnceEventPool::<i32>::new();
821 {
822 let (_sender, receiver) = pool.bind_by_ref();
823
824 // Force the receiver to be dropped without being consumed by recv()
825 drop(receiver);
826 // Sender will be dropped at end of scope
827 }
828
829 // Create a new event to verify the pool is still functional
830 let (sender2, receiver2) = pool.bind_by_ref();
831 sender2.send(456);
832 let value = futures::executor::block_on(receiver2).unwrap();
833 assert_eq!(value, 456);
834 });
835 }
836
837 #[test]
838 fn by_arc_sender_drop_cleanup() {
839 with_watchdog(|| {
840 let pool = Arc::new(OnceEventPool::<i32>::new());
841 let (sender, _receiver) = pool.bind_by_arc();
842
843 // Force the sender to be dropped without being consumed by send()
844 drop(sender);
845
846 // Create a new event to verify the pool is still functional
847 let (sender2, receiver2) = pool.bind_by_arc();
848 sender2.send(654);
849 let value = futures::executor::block_on(receiver2).unwrap();
850 assert_eq!(value, 654);
851 });
852 }
853
854 #[test]
855 fn by_arc_receiver_drop_cleanup() {
856 with_watchdog(|| {
857 let pool = Arc::new(OnceEventPool::<i32>::new());
858 let (_sender, receiver) = pool.bind_by_arc();
859
860 // Force the receiver to be dropped without being consumed by recv()
861 drop(receiver);
862
863 // Create a new event to verify the pool is still functional
864 let (sender2, receiver2) = pool.bind_by_arc();
865 sender2.send(987);
866 let value = futures::executor::block_on(receiver2).unwrap();
867 assert_eq!(value, 987);
868 });
869 }
870
871 #[test]
872 fn by_ptr_sender_drop_cleanup() {
873 with_watchdog(|| {
874 let pool = Box::pin(OnceEventPool::<i32>::new());
875
876 // SAFETY: We ensure the pool is pinned and outlives the sender and receiver
877 let (sender, _receiver) = unsafe { pool.as_ref().bind_by_ptr() };
878
879 // Force the sender to be dropped without being consumed by send()
880 drop(sender);
881
882 // Create a new event to verify the pool is still functional
883 // SAFETY: We ensure the pool is pinned and outlives the sender and receiver
884 let (sender2, receiver2) = unsafe { pool.as_ref().bind_by_ptr() };
885 sender2.send(147);
886 let value = futures::executor::block_on(receiver2).unwrap();
887 assert_eq!(value, 147);
888 });
889 }
890
891 #[test]
892 fn by_ptr_receiver_drop_cleanup() {
893 with_watchdog(|| {
894 let pool = Box::pin(OnceEventPool::<i32>::new());
895
896 // SAFETY: We ensure the pool is pinned and outlives the sender and receiver
897 let (_sender, receiver) = unsafe { pool.as_ref().bind_by_ptr() };
898
899 // Force the receiver to be dropped without being consumed by recv()
900 drop(receiver);
901
902 // Create a new event to verify the pool is still functional
903 // SAFETY: We ensure the pool is pinned and outlives the sender and receiver
904 let (sender2, receiver2) = unsafe { pool.as_ref().bind_by_ptr() };
905 sender2.send(258);
906 let value = futures::executor::block_on(receiver2).unwrap();
907 assert_eq!(value, 258);
908 });
909 }
910
911 #[test]
912 fn dec_ref_and_cleanup_is_called() {
913 with_watchdog(|| {
914 let pool = OnceEventPool::<i32>::new();
915
916 // Create multiple events and drop them without using
917 for _ in 0..5 {
918 let (sender, receiver) = pool.bind_by_ref();
919 drop(sender);
920 drop(receiver);
921 }
922
923 // Verify pool still works correctly after cleanup
924 let (sender, receiver) = pool.bind_by_ref();
925 sender.send(999);
926 let value = futures::executor::block_on(receiver).unwrap();
927 assert_eq!(value, 999);
928 });
929 }
930
931 #[test]
932 fn pool_cleanup_verified_by_capacity() {
933 with_watchdog(|| {
934 let pool = OnceEventPool::<i32>::new();
935
936 // Create many events and drop them without using - this should not grow the pool permanently
937 for i in 0..10 {
938 let (sender, receiver) = pool.bind_by_ref();
939 if i % 2 == 0 {
940 drop(sender);
941 drop(receiver);
942 } else {
943 // Use some events normally
944 sender.send(i);
945 let _value = futures::executor::block_on(receiver);
946 }
947 }
948
949 // The pool should have cleaned up unused events
950 // If cleanup is broken, the pool would retain all the unused events
951 // This is a bit of an implementation detail but it's necessary to catch the leak
952
953 // Create one more event to verify pool still works
954 let (sender, receiver) = pool.bind_by_ref();
955 sender.send(42);
956 let value = futures::executor::block_on(receiver).unwrap();
957 assert_eq!(value, 42);
958 });
959 }
960
961 #[test]
962 fn pool_stress_test_no_leak() {
963 with_watchdog(|| {
964 let pool = OnceEventPool::<u64>::new();
965
966 // Stress test with many dropped events
967 for _ in 0..100 {
968 let (sender, receiver) = pool.bind_by_ref();
969 // Drop without using
970 drop(sender);
971 drop(receiver);
972 }
973
974 // Pool should still work efficiently
975 let (sender, receiver) = pool.bind_by_ref();
976 sender.send(999);
977 let value = futures::executor::block_on(receiver).unwrap();
978 assert_eq!(value, 999);
979 });
980 }
981
982 #[test]
983 fn by_ref_drop_actually_cleans_up_pool() {
984 let pool = OnceEventPool::<u32>::new();
985
986 // Create many events but drop them without use
987 for _ in 0..100 {
988 let (_sender, _receiver) = pool.bind_by_ref();
989 // Both sender and receiver will be dropped here
990 }
991
992 // Pool should be cleaned up - all events should be removed
993 // If Drop implementations don't work, pool will retain unused events
994 let mut pool_guard = pool.pool.lock().unwrap();
995 assert_eq!(
996 pool_guard.len(),
997 0,
998 "Pool still contains unused events - Drop implementations not working"
999 );
1000
1001 // An empty pool should be able to shrink to capacity 0
1002 pool_guard.shrink_to_fit();
1003 assert_eq!(
1004 pool_guard.capacity(),
1005 0,
1006 "Empty pool should shrink to capacity 0"
1007 );
1008 }
1009
1010 #[test]
1011 fn by_arc_drop_actually_cleans_up_pool() {
1012 let pool = Arc::new(OnceEventPool::<u32>::new());
1013
1014 // Create many events but drop them without use
1015 for _ in 0..100 {
1016 let (_sender, _receiver) = pool.bind_by_arc();
1017 // Both sender and receiver will be dropped here
1018 }
1019
1020 // Pool should be cleaned up - all events should be removed
1021 let mut pool_guard = pool.pool.lock().unwrap();
1022 assert_eq!(
1023 pool_guard.len(),
1024 0,
1025 "Pool still contains unused events - Drop implementations not working"
1026 );
1027
1028 // An empty pool should be able to shrink to capacity 0
1029 pool_guard.shrink_to_fit();
1030 assert_eq!(
1031 pool_guard.capacity(),
1032 0,
1033 "Empty pool should shrink to capacity 0"
1034 );
1035 }
1036
1037 #[test]
1038 fn by_ptr_drop_actually_cleans_up_pool() {
1039 // Test ptr-based pooled events cleanup by checking pool state
1040 for iteration in 0..10 {
1041 {
1042 let pool = Box::pin(OnceEventPool::<u32>::new());
1043 // SAFETY: We pin the pool for the duration of by_ptr call
1044 let (_sender, _receiver) = unsafe { pool.as_ref().bind_by_ptr() };
1045 // sender and receiver will be dropped here
1046 }
1047
1048 // For this test, we'll verify that repeated operations don't accumulate
1049 // If Drop implementations don't work, we'd see memory accumulation
1050 println!("Iteration {iteration}: Pool operations completed");
1051 }
1052 }
1053
1054 #[test]
1055 fn dec_ref_and_cleanup_actually_removes_events() {
1056 let pool = OnceEventPool::<u32>::new();
1057
1058 // Test 1: Check that events are added to pool
1059 let pool_len_before = {
1060 let pool_guard = pool.pool.lock().unwrap();
1061 pool_guard.len()
1062 };
1063
1064 // Create events in a scope to ensure they're dropped
1065 let sender_key = {
1066 let (sender, receiver) = pool.bind_by_ref();
1067 let key = sender.key;
1068
1069 // Events should be in pool now (don't check len while borrowed)
1070
1071 drop(sender);
1072 drop(receiver);
1073 key
1074 };
1075
1076 // Now check that cleanup worked
1077 let pool_len_after = {
1078 let pool_guard = pool.pool.lock().unwrap();
1079 pool_guard.len()
1080 };
1081 assert_eq!(
1082 pool_len_after, pool_len_before,
1083 "Pool not cleaned up after dropping events - dec_ref_and_cleanup not working, \
1084 key: {sender_key:?}"
1085 );
1086 }
1087
1088 #[test]
1089 fn shrink_to_fit_with_empty_pool_shrinks_to_zero() {
1090 let pool = OnceEventPool::<u32>::new();
1091
1092 // Create and drop events without using them
1093 for _ in 0..10 {
1094 drop(pool.bind_by_ref());
1095 }
1096
1097 assert_eq!(pool.len(), 0);
1098 assert!(pool.is_empty());
1099
1100 // Shrink the pool to fit
1101 pool.shrink_to_fit();
1102
1103 assert_eq!(
1104 pool.pool.lock().unwrap().capacity(),
1105 0,
1106 "Empty pool should shrink to capacity 0"
1107 );
1108 }
1109
1110 #[test]
1111 fn event_removed_from_pool_after_endpoints_immediate_drop() {
1112 let pool = OnceEventPool::<u32>::new();
1113
1114 drop(pool.bind_by_ref());
1115
1116 assert_eq!(pool.len(), 0);
1117 assert!(pool.is_empty());
1118 }
1119
1120 #[test]
1121 fn pool_len_and_is_empty_methods() {
1122 let pool = OnceEventPool::<u32>::new();
1123
1124 // Initially empty
1125 assert_eq!(pool.len(), 0);
1126 assert!(pool.is_empty());
1127
1128 // Create first event
1129 let (sender1, receiver1) = pool.bind_by_ref();
1130 assert_eq!(pool.len(), 1);
1131 assert!(!pool.is_empty());
1132
1133 // Create second event while first is still bound
1134 let (sender2, receiver2) = pool.bind_by_ref();
1135 assert_eq!(pool.len(), 2);
1136 assert!(!pool.is_empty());
1137
1138 // Drop first event endpoints
1139 drop(sender1);
1140 drop(receiver1);
1141 assert_eq!(pool.len(), 1);
1142 assert!(!pool.is_empty());
1143
1144 // Drop second event endpoints
1145 drop(sender2);
1146 drop(receiver2);
1147 assert_eq!(pool.len(), 0);
1148 assert!(pool.is_empty());
1149 }
1150
1151 #[test]
1152 fn pooled_event_receiver_gets_disconnected_when_sender_dropped() {
1153 with_watchdog(|| {
1154 futures::executor::block_on(async {
1155 let pool = OnceEventPool::<i32>::new();
1156 let (sender, receiver) = pool.bind_by_ref();
1157
1158 // Drop the sender without sending anything
1159 drop(sender);
1160
1161 // Receiver should get a Disconnected error
1162 let result = receiver.await;
1163 assert!(result.is_err());
1164 assert!(matches!(result, Err(Disconnected)));
1165 });
1166 });
1167 }
1168
1169 #[test]
1170 fn pooled_event_by_arc_receiver_gets_disconnected_when_sender_dropped() {
1171 with_watchdog(|| {
1172 futures::executor::block_on(async {
1173 let pool = Arc::new(OnceEventPool::<i32>::new());
1174 let (sender, receiver) = pool.bind_by_arc();
1175
1176 // Drop the sender without sending anything
1177 drop(sender);
1178
1179 // Receiver should get a Disconnected error
1180 let result = receiver.await;
1181 assert!(result.is_err());
1182 assert!(matches!(result, Err(Disconnected)));
1183 });
1184 });
1185 }
1186
1187 #[test]
1188 fn pooled_event_by_ptr_receiver_gets_disconnected_when_sender_dropped() {
1189 with_watchdog(|| {
1190 futures::executor::block_on(async {
1191 let pool = Box::pin(OnceEventPool::<i32>::new());
1192
1193 // SAFETY: We ensure the pool is pinned and outlives the sender and receiver
1194 let (sender, receiver) = unsafe { pool.as_ref().bind_by_ptr() };
1195
1196 // Drop the sender without sending anything
1197 drop(sender);
1198
1199 // Receiver should get a Disconnected error
1200 let result = receiver.await;
1201 assert!(result.is_err());
1202 assert!(matches!(result, Err(Disconnected)));
1203 });
1204 });
1205 }
1206
1207 #[test]
1208 fn pooled_sender_dropped_when_awaiting_signals_disconnected() {
1209 let pool = OnceEventPool::<i32>::new();
1210 let (sender, receiver) = pool.bind_by_ref();
1211
1212 let mut receiver = pin!(receiver);
1213 let mut context = task::Context::from_waker(noop_waker_ref());
1214 assert!(matches!(
1215 receiver.as_mut().poll(&mut context),
1216 task::Poll::Pending
1217 ));
1218
1219 drop(sender);
1220
1221 let mut context = task::Context::from_waker(noop_waker_ref());
1222 assert!(matches!(
1223 receiver.as_mut().poll(&mut context),
1224 task::Poll::Ready(Err(Disconnected))
1225 ));
1226 }
1227
1228 #[cfg(debug_assertions)]
1229 #[test]
1230 fn inspect_awaiters_empty_pool() {
1231 let pool = OnceEventPool::<i32>::new();
1232
1233 let mut count = 0;
1234 pool.inspect_awaiters(|_| {
1235 count += 1;
1236 });
1237
1238 assert_eq!(count, 0);
1239 }
1240
1241 #[cfg(debug_assertions)]
1242 #[test]
1243 fn inspect_awaiters_no_awaiters() {
1244 let pool = OnceEventPool::<String>::new();
1245
1246 // Create some events but don't await them
1247 let (_sender1, _receiver1) = pool.bind_by_ref();
1248 let (_sender2, _receiver2) = pool.bind_by_ref();
1249
1250 let mut count = 0;
1251 pool.inspect_awaiters(|_| {
1252 count += 1;
1253 });
1254
1255 assert_eq!(count, 0);
1256 }
1257
1258 #[cfg(debug_assertions)]
1259 #[test]
1260 fn inspect_awaiters_with_awaiters() {
1261 let pool = OnceEventPool::<i32>::new();
1262
1263 // Create events and start awaiting them
1264 let (_sender1, receiver1) = pool.bind_by_ref();
1265 let (_sender2, receiver2) = pool.bind_by_ref();
1266
1267 let mut context = task::Context::from_waker(noop_waker_ref());
1268 let mut pinned_receiver1 = pin!(receiver1);
1269 let mut pinned_receiver2 = pin!(receiver2);
1270
1271 // Poll both receivers to create awaiters
1272 let _poll1 = pinned_receiver1.as_mut().poll(&mut context);
1273 let _poll2 = pinned_receiver2.as_mut().poll(&mut context);
1274
1275 let mut count = 0;
1276 pool.inspect_awaiters(|_backtrace| {
1277 count += 1;
1278 });
1279
1280 assert_eq!(count, 2);
1281 }
1282
1283 #[cfg(debug_assertions)]
1284 #[test]
1285 fn inspect_awaiters_mixed_states() {
1286 let pool = OnceEventPool::<String>::new();
1287
1288 // Create multiple events in different states
1289 let (_sender1, receiver1) = pool.bind_by_ref();
1290 let (sender2, receiver2) = pool.bind_by_ref();
1291 let (_sender3, receiver3) = pool.bind_by_ref();
1292
1293 // Only poll receiver1 and receiver3
1294 let mut context = task::Context::from_waker(noop_waker_ref());
1295 let mut pinned_receiver1 = pin!(receiver1);
1296 let mut pinned_receiver3 = pin!(receiver3);
1297
1298 let _poll1 = pinned_receiver1.as_mut().poll(&mut context);
1299 let _poll3 = pinned_receiver3.as_mut().poll(&mut context);
1300
1301 // Complete sender2 without polling its receiver
1302 sender2.send("completed".to_string());
1303 drop(receiver2);
1304
1305 let mut count = 0;
1306 pool.inspect_awaiters(|_backtrace| {
1307 count += 1;
1308 });
1309
1310 // Should only count the two that are actually awaiting
1311 assert_eq!(count, 2);
1312 }
1313
1314 #[test]
1315 fn thread_safety() {
1316 // The pool is accessed across threads, so requires Sync as well as Send.
1317 assert_impl_all!(OnceEventPool<u32>: Send, Sync);
1318
1319 // These are all meant to be consumed locally - they may move between threads but are
1320 // not shared between threads, so Sync is not expected, only Send.
1321 assert_impl_all!(PooledOnceSender<RefPool<'static, u32>>: Send);
1322 assert_impl_all!(PooledOnceReceiver<RefPool<'static, u32>>: Send);
1323 assert_impl_all!(PooledOnceSender<ArcPool<u32>>: Send);
1324 assert_impl_all!(PooledOnceReceiver<ArcPool<u32>>: Send);
1325 assert_impl_all!(PooledOnceSender<PtrPool<u32>>: Send);
1326 assert_impl_all!(PooledOnceReceiver<PtrPool<u32>>: Send);
1327 assert_not_impl_any!(PooledOnceSender<RefPool<'static, u32>>: Sync);
1328 assert_not_impl_any!(PooledOnceReceiver<RefPool<'static, u32>>: Sync);
1329 assert_not_impl_any!(PooledOnceSender<ArcPool<u32>>: Sync);
1330 assert_not_impl_any!(PooledOnceReceiver<ArcPool<u32>>: Sync);
1331 assert_not_impl_any!(PooledOnceSender<PtrPool<u32>>: Sync);
1332 assert_not_impl_any!(PooledOnceReceiver<PtrPool<u32>>: Sync);
1333 }
1334}