events/once/pooled_sync.rs
1//! Pooled events that provide automatic resource management.
2//!
3//! This module provides pooled variants of events that automatically manage their lifecycle
4//! using reference counting. Events are created from pools and automatically returned to the
5//! pool when both sender and receiver are dropped.
6
7#[cfg(debug_assertions)]
8use std::backtrace::Backtrace;
9use std::fmt::Debug;
10use std::future::Future;
11use std::marker::PhantomPinned;
12use std::mem::ManuallyDrop;
13use std::ops::Deref;
14use std::pin::Pin;
15use std::ptr::{self, NonNull};
16use std::sync::{Arc, Mutex};
17use std::{any, fmt, task};
18
19use infinity_pool::{RawPinnedPool, RawPooled};
20
21use crate::{Disconnected, ERR_POISONED_LOCK, OnceEvent, ReflectiveTSend, Sealed};
22
23/// A pool that manages thread-safe events with automatic cleanup.
24///
25/// The pool creates events on demand and automatically cleans them up when both
26/// sender and receiver endpoints are dropped.
27///
28/// This pool provides zero-allocation event reuse for high-frequency eventing scenarios
29/// in a thread-safe manner.
30///
31/// # Example
32///
33/// ```rust
34/// use events::OnceEventPool;
35/// # use futures::executor::block_on;
36///
37/// # block_on(async {
38/// let pool = OnceEventPool::<i32>::new();
39///
40/// // First usage - creates new event
41/// let (sender1, receiver1) = pool.bind_by_ref();
42/// sender1.send(42);
43/// let value1 = receiver1.await.unwrap();
44/// assert_eq!(value1, 42);
45/// // Event returned to pool when sender1/receiver1 are dropped
46///
47/// // Second usage - reuses the same event instance (efficient!)
48/// let (sender2, receiver2) = pool.bind_by_ref();
49/// sender2.send(100);
50/// let value2 = receiver2.await.unwrap();
51/// assert_eq!(value2, 100);
52/// // Same event reused - no additional allocation overhead
53/// # });
54/// ```
55pub struct OnceEventPool<T>
56where
57 T: Send,
58{
59 pool: Mutex<RawPinnedPool<OnceEvent<T>>>,
60
61 // It is invalid to move this type once it has been pinned.
62 _requires_pinning: PhantomPinned,
63}
64
65impl<T> Debug for OnceEventPool<T>
66where
67 T: Send,
68{
69 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
70 f.debug_struct("OnceEventPool")
71 .field("item_type", &format_args!("{}", any::type_name::<T>()))
72 .finish_non_exhaustive()
73 }
74}
75
76impl<T> OnceEventPool<T>
77where
78 T: Send,
79{
80 /// Creates a new empty event pool.
81 ///
82 /// # Example
83 ///
84 /// ```rust
85 /// use events::OnceEventPool;
86 ///
87 /// let pool = OnceEventPool::<String>::new();
88 /// ```
89 #[must_use]
90 pub fn new() -> Self {
91 Self {
92 pool: Mutex::new(RawPinnedPool::new()),
93 _requires_pinning: PhantomPinned,
94 }
95 }
96
97 /// Creates sender and receiver endpoints connected by reference to the pool.
98 ///
99 /// The pool will create a new event and return endpoints that reference it.
100 /// When both endpoints are dropped, the event will be automatically cleaned up.
101 ///
102 /// # Example
103 ///
104 /// ```rust
105 /// use events::OnceEventPool;
106 /// # use futures::executor::block_on;
107 ///
108 /// # block_on(async {
109 /// let pool = OnceEventPool::<i32>::new();
110 ///
111 /// // First event usage
112 /// let (sender1, receiver1) = pool.bind_by_ref();
113 /// sender1.send(42);
114 /// let value1 = receiver1.await.unwrap();
115 /// assert_eq!(value1, 42);
116 ///
117 /// // Second event usage - efficiently reuses the same underlying event
118 /// let (sender2, receiver2) = pool.bind_by_ref();
119 /// sender2.send(100);
120 /// let value2 = receiver2.await.unwrap();
121 /// assert_eq!(value2, 100);
122 /// # });
123 /// ```
124 #[must_use]
125 pub fn bind_by_ref(
126 &self,
127 ) -> (
128 PooledOnceSender<RefPool<'_, T>>,
129 PooledOnceReceiver<RefPool<'_, T>>,
130 ) {
131 let mut inner_pool = self.pool.lock().expect(ERR_POISONED_LOCK);
132
133 // SAFETY: We rely on OnceEvent::new_in_place_bound() for correct initialization.
134 let handle = unsafe { inner_pool.insert_with(OnceEvent::new_in_place_bound) };
135
136 let pool_ref = RefPool { pool: self };
137
138 let shared_handle = handle.into_shared();
139
140 (
141 PooledOnceSender {
142 pool_ref: pool_ref.clone(),
143 event: shared_handle,
144 },
145 PooledOnceReceiver {
146 pool_ref,
147 event: Some(shared_handle),
148 },
149 )
150 }
151
152 /// Creates sender and receiver endpoints connected by Arc to the pool.
153 ///
154 /// The pool will create a new event and return endpoints that hold Arc references.
155 /// When both endpoints are dropped, the event will be automatically cleaned up.
156 ///
157 /// # Example
158 ///
159 /// ```rust
160 /// use std::sync::Arc;
161 ///
162 /// use events::OnceEventPool;
163 ///
164 /// let pool = Arc::new(OnceEventPool::<i32>::new());
165 ///
166 /// // First usage
167 /// let (sender1, receiver1) = pool.bind_by_arc();
168 /// sender1.send(42);
169 /// let value1 = futures::executor::block_on(receiver1).unwrap();
170 /// assert_eq!(value1, 42);
171 ///
172 /// // Second usage - efficiently reuses the same pooled event
173 /// let (sender2, receiver2) = pool.bind_by_arc();
174 /// sender2.send(200);
175 /// let value2 = futures::executor::block_on(receiver2).unwrap();
176 /// assert_eq!(value2, 200);
177 /// ```
178 #[must_use]
179 pub fn bind_by_arc(
180 self: &Arc<Self>,
181 ) -> (PooledOnceSender<ArcPool<T>>, PooledOnceReceiver<ArcPool<T>>) {
182 let mut inner_pool = self.pool.lock().expect(ERR_POISONED_LOCK);
183
184 // SAFETY: We rely on OnceEvent::new_in_place_bound() for correct initialization.
185 let handle = unsafe { inner_pool.insert_with(OnceEvent::new_in_place_bound) };
186
187 let pool_ref = ArcPool {
188 pool: Arc::clone(self),
189 };
190
191 let shared_handle = handle.into_shared();
192
193 (
194 PooledOnceSender {
195 pool_ref: pool_ref.clone(),
196 event: shared_handle,
197 },
198 PooledOnceReceiver {
199 pool_ref,
200 event: Some(shared_handle),
201 },
202 )
203 }
204
205 /// Creates sender and receiver endpoints connected by raw pointer to the pool.
206 ///
207 /// The pool will create a new event and return endpoints that hold raw pointers.
208 /// When both endpoints are dropped, the event will be automatically cleaned up.
209 ///
210 /// # Safety
211 ///
212 /// The caller must ensure that:
213 /// - The pool remains valid and pinned for the entire lifetime of the sender and receiver
214 /// - The sender and receiver are dropped before the pool is dropped
215 ///
216 /// # Example
217 ///
218 /// ```rust
219 /// use events::OnceEventPool;
220 ///
221 /// let pool = Box::pin(OnceEventPool::<i32>::new());
222 ///
223 /// // First usage
224 /// // SAFETY: We ensure the pool is pinned and outlives the sender and receiver
225 /// let (sender1, receiver1) = unsafe { pool.as_ref().bind_by_ptr() };
226 /// sender1.send(42);
227 /// let value1 = futures::executor::block_on(receiver1).unwrap();
228 /// assert_eq!(value1, 42);
229 ///
230 /// // Second usage - reuses the same event from the pool efficiently
231 /// // SAFETY: Pool is still valid and pinned
232 /// let (sender2, receiver2) = unsafe { pool.as_ref().bind_by_ptr() };
233 /// sender2.send(100);
234 /// let value2 = futures::executor::block_on(receiver2).unwrap();
235 /// assert_eq!(value2, 100);
236 /// // Both sender and receiver pairs are dropped here, before pool
237 /// ```
238 #[must_use]
239 pub unsafe fn bind_by_ptr(
240 self: Pin<&Self>,
241 ) -> (PooledOnceSender<PtrPool<T>>, PooledOnceReceiver<PtrPool<T>>) {
242 let mut inner_pool = self.pool.lock().expect(ERR_POISONED_LOCK);
243
244 // SAFETY: We rely on OnceEvent::new_in_place_bound() for correct initialization.
245 let handle = unsafe { inner_pool.insert_with(OnceEvent::new_in_place_bound) };
246
247 let pool_ref = PtrPool {
248 pool: NonNull::from(self.get_ref()),
249 };
250
251 let shared_handle = handle.into_shared();
252
253 (
254 PooledOnceSender {
255 pool_ref: pool_ref.clone(),
256 event: shared_handle,
257 },
258 PooledOnceReceiver {
259 pool_ref,
260 event: Some(shared_handle),
261 },
262 )
263 }
264
265 /// Returns the number of events currently in the pool.
266 ///
267 /// This represents the count of events that are currently allocated in the pool,
268 /// including those that are currently bound to sender/receiver endpoints.
269 /// Events are removed from the pool only when both endpoints are dropped.
270 ///
271 /// # Example
272 ///
273 /// ```rust
274 /// use events::OnceEventPool;
275 ///
276 /// let pool = OnceEventPool::<i32>::new();
277 /// assert_eq!(pool.len(), 0);
278 ///
279 /// let (sender, receiver) = pool.bind_by_ref();
280 /// assert_eq!(pool.len(), 1); // Event is in pool while endpoints exist
281 ///
282 /// drop(sender);
283 /// drop(receiver);
284 /// assert_eq!(pool.len(), 0); // Event cleaned up after both endpoints dropped
285 /// ```
286 #[must_use]
287 pub fn len(&self) -> usize {
288 self.pool.lock().expect(ERR_POISONED_LOCK).len()
289 }
290
291 /// Returns whether the pool is empty.
292 ///
293 /// This is equivalent to `pool.len() == 0` but may be more efficient.
294 /// An empty pool may still have reserved capacity.
295 ///
296 /// # Example
297 ///
298 /// ```rust
299 /// use events::OnceEventPool;
300 ///
301 /// let pool = OnceEventPool::<i32>::new();
302 /// assert!(pool.is_empty());
303 ///
304 /// let (sender, receiver) = pool.bind_by_ref();
305 /// assert!(!pool.is_empty()); // Pool has event while endpoints exist
306 ///
307 /// drop(sender);
308 /// drop(receiver);
309 /// assert!(pool.is_empty()); // Pool empty after both endpoints dropped
310 /// ```
311 #[must_use]
312 pub fn is_empty(&self) -> bool {
313 self.pool.lock().expect(ERR_POISONED_LOCK).is_empty()
314 }
315
316 /// Shrinks the capacity of the pool to reduce memory usage.
317 ///
318 /// This method attempts to release unused memory by reducing the pool's capacity.
319 /// The actual reduction is implementation-dependent and may vary - some capacity
320 /// may be released, or none at all.
321 ///
322 /// # Example
323 ///
324 /// ```rust
325 /// use events::OnceEventPool;
326 ///
327 /// let pool = OnceEventPool::<i32>::new();
328 ///
329 /// // Use the pool which may grow its capacity
330 /// for _ in 0..100 {
331 /// let (sender, receiver) = pool.bind_by_ref();
332 /// sender.send(42);
333 /// let _value = futures::executor::block_on(receiver);
334 /// }
335 ///
336 /// // Attempt to shrink to reduce memory usage
337 /// pool.shrink_to_fit();
338 /// ```
339 pub fn shrink_to_fit(&self) {
340 let mut inner_pool = self.pool.lock().expect(ERR_POISONED_LOCK);
341 inner_pool.shrink_to_fit();
342 }
343
344 /// Uses the provided closure to inspect the backtraces of the most recent awaiter of each
345 /// event in the pool (or `None` if it has never been awaited).
346 ///
347 /// This method is only available in debug builds (`cfg(debug_assertions)`).
348 /// For any data to be present, `RUST_BACKTRACE=1` or `RUST_LIB_BACKTRACE=1` must be set.
349 ///
350 /// The closure is called once for each event in the pool that is currently being awaited by
351 /// someone.
352 #[cfg(debug_assertions)]
353 pub fn inspect_awaiters(&self, mut f: impl FnMut(Option<&Backtrace>)) {
354 let inner_pool = self.pool.lock().expect(ERR_POISONED_LOCK);
355
356 for event_ptr in inner_pool.iter() {
357 // SAFETY: The pool remains alive for the duration of this function call, satisfying
358 // the lifetime requirement. The pointer is valid as it comes from the pool's iterator.
359 // We only ever create shared references to the events, so no conflicting exclusive
360 // references can exist.
361 let event = unsafe { event_ptr.as_ref() };
362 event.inspect_awaiter(&mut f);
363 }
364 }
365}
366
367impl<T> Default for OnceEventPool<T>
368where
369 T: Send,
370{
371 fn default() -> Self {
372 Self::new()
373 }
374}
375
376/// Enables a sender or receiver to reference the pool that stores the event that connects them.
377///
378/// This is a sealed trait and exists for internal use only. You never need to use it.
379#[expect(private_bounds, reason = "intentional - sealed trait")]
380pub trait PoolRef<T>: Deref<Target = OnceEventPool<T>> + ReflectiveTSend + Debug + Sealed
381where
382 T: Send,
383{
384}
385
386/// An event pool referenced via `&` shared reference.
387///
388/// Only used in type names. Instances are created internally by [`OnceEventPool`].
389#[derive(Copy)]
390pub struct RefPool<'a, T>
391where
392 T: Send,
393{
394 pool: &'a OnceEventPool<T>,
395}
396
397impl<T> Debug for RefPool<'_, T>
398where
399 T: Send,
400{
401 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
402 f.debug_struct("RefPool")
403 .field("item_type", &format_args!("{}", any::type_name::<T>()))
404 .finish_non_exhaustive()
405 }
406}
407
408impl<T> Sealed for RefPool<'_, T> where T: Send {}
409impl<T> PoolRef<T> for RefPool<'_, T> where T: Send {}
410impl<T> Deref for RefPool<'_, T>
411where
412 T: Send,
413{
414 type Target = OnceEventPool<T>;
415
416 fn deref(&self) -> &Self::Target {
417 self.pool
418 }
419}
420impl<T> Clone for RefPool<'_, T>
421where
422 T: Send,
423{
424 fn clone(&self) -> Self {
425 Self { pool: self.pool }
426 }
427}
428impl<T: Send> ReflectiveTSend for RefPool<'_, T> {
429 type T = T;
430}
431
432/// An event pool referenced via `Arc` shared reference.
433///
434/// Only used in type names. Instances are created internally by [`OnceEventPool`].
435pub struct ArcPool<T>
436where
437 T: Send,
438{
439 pool: Arc<OnceEventPool<T>>,
440}
441
442impl<T> Debug for ArcPool<T>
443where
444 T: Send,
445{
446 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
447 f.debug_struct("ArcPool")
448 .field("item_type", &format_args!("{}", any::type_name::<T>()))
449 .finish_non_exhaustive()
450 }
451}
452
453impl<T> Sealed for ArcPool<T> where T: Send {}
454impl<T> PoolRef<T> for ArcPool<T> where T: Send {}
455impl<T> Deref for ArcPool<T>
456where
457 T: Send,
458{
459 type Target = OnceEventPool<T>;
460
461 fn deref(&self) -> &Self::Target {
462 &self.pool
463 }
464}
465impl<T> Clone for ArcPool<T>
466where
467 T: Send,
468{
469 fn clone(&self) -> Self {
470 Self {
471 pool: Arc::clone(&self.pool),
472 }
473 }
474}
475impl<T: Send> ReflectiveTSend for ArcPool<T> {
476 type T = T;
477}
478
479/// An event pool referenced via raw pointer.
480///
481/// Only used in type names. Instances are created internally by [`OnceEventPool`].
482#[derive(Copy)]
483pub struct PtrPool<T>
484where
485 T: Send,
486{
487 pool: NonNull<OnceEventPool<T>>,
488}
489
490impl<T> Debug for PtrPool<T>
491where
492 T: Send,
493{
494 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
495 f.debug_struct("PtrPool")
496 .field("item_type", &format_args!("{}", any::type_name::<T>()))
497 .finish_non_exhaustive()
498 }
499}
500
501impl<T> Sealed for PtrPool<T> where T: Send {}
502impl<T> PoolRef<T> for PtrPool<T> where T: Send {}
503impl<T> Deref for PtrPool<T>
504where
505 T: Send,
506{
507 type Target = OnceEventPool<T>;
508
509 fn deref(&self) -> &Self::Target {
510 // SAFETY: The owner of the sender/receiver pair is responsible for ensuring the pool
511 // outlives both, satisfying the lifetime requirement for as_ref().
512 unsafe { self.pool.as_ref() }
513 }
514}
515impl<T> Clone for PtrPool<T>
516where
517 T: Send,
518{
519 fn clone(&self) -> Self {
520 Self { pool: self.pool }
521 }
522}
523impl<T: Send> ReflectiveTSend for PtrPool<T> {
524 type T = T;
525}
526// SAFETY: This is only used with the thread-safe pool (the pool is Sync).
527unsafe impl<T> Send for PtrPool<T> where T: Send {}
528
529/// A receiver that can receive a single value through a thread-safe event.
530///
531/// The type of the value is the inner type parameter,
532/// i.e. the `T` in `PooledOnceReceiver<ArcPool<T>>`.
533///
534/// The outer type parameter determines the mechanism by which the endpoint is bound to the event
535/// pool. Different binding mechanisms offer different performance characteristics and resource
536/// management patterns.
537pub struct PooledOnceSender<P>
538where
539 P: PoolRef<<P as ReflectiveTSend>::T>,
540{
541 pool_ref: P,
542 event: RawPooled<OnceEvent<P::T>>,
543}
544
545impl<P> Debug for PooledOnceSender<P>
546where
547 P: PoolRef<<P as ReflectiveTSend>::T>,
548{
549 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
550 f.debug_struct("PooledOnceSender")
551 .field("item_type", &format_args!("{}", any::type_name::<P::T>()))
552 .field("pool_ref", &self.pool_ref)
553 .field("event", &self.event)
554 .finish()
555 }
556}
557
558impl<P> PooledOnceSender<P>
559where
560 P: PoolRef<<P as ReflectiveTSend>::T>,
561{
562 /// Sends a value through the event.
563 ///
564 /// This method consumes the sender and always succeeds, regardless of whether
565 /// there is a receiver waiting.
566 ///
567 /// # Example
568 ///
569 /// ```rust
570 /// use events::OnceEventPool;
571 ///
572 /// let pool = OnceEventPool::new();
573 /// let (sender, receiver) = pool.bind_by_ref();
574 ///
575 /// sender.send(42);
576 /// let value = futures::executor::block_on(receiver).unwrap();
577 /// assert_eq!(value, 42);
578 /// ```
579 #[inline]
580 pub fn send(self, value: P::T) {
581 // The drop logic is different before/after set(), so we switch to manual drop here.
582 let mut this = ManuallyDrop::new(self);
583
584 // SAFETY: The pool remains alive through the pool_ref, satisfying the lifetime requirement
585 // for creating a reference. No exclusive references can exist because the events package
586 // only uses RawPooled<T> (shared handles), never RawPooledMut<T> (exclusive handles).
587 let event = unsafe { this.event.as_ref() };
588
589 let set_result = event.set(value);
590
591 if set_result == Err(Disconnected) {
592 // SAFETY: The handle belongs to this pool (created via insert_with()) and the event
593 // state machine ensures this is the only removal path for this event instance.
594 unsafe {
595 this.pool_ref
596 .pool
597 .lock()
598 .expect(ERR_POISONED_LOCK)
599 .remove(this.event);
600 }
601 }
602
603 // We also still need to drop the pool ref itself!
604 // SAFETY: This is a valid object and ManuallyDrop ensures it will not be auto-dropped.
605 unsafe {
606 ptr::drop_in_place(&raw mut this.pool_ref);
607 }
608 }
609}
610
611impl<P> Drop for PooledOnceSender<P>
612where
613 P: PoolRef<<P as ReflectiveTSend>::T>,
614{
615 #[inline]
616 fn drop(&mut self) {
617 // SAFETY: The pool remains alive through the pool_ref, satisfying the lifetime requirement
618 // for creating a reference. No exclusive references can exist because the events package
619 // only uses RawPooled<T> (shared handles), never RawPooledMut<T> (exclusive handles).
620 let event = unsafe { self.event.as_ref() };
621
622 // This ensures receivers get Disconnected errors if the sender is dropped without sending.
623 if event.sender_dropped_without_set() == Err(Disconnected) {
624 // SAFETY: The handle belongs to this pool (created via insert_with()) and the event
625 // state machine ensures this is the only removal path for this event instance.
626 unsafe {
627 self.pool_ref
628 .pool
629 .lock()
630 .expect(ERR_POISONED_LOCK)
631 .remove(self.event);
632 }
633 }
634 }
635}
636
637// SAFETY: The NonNull marks it !Send by default but we know that everything behind the pointer
638// is thread-safe, so all is well. We also require `Send` from `R` to be extra safe here.
639unsafe impl<P> Send for PooledOnceSender<P> where P: PoolRef<<P as ReflectiveTSend>::T> + Send {}
640
641/// A receiver that can receive a single value through a thread-safe event.
642///
643/// The type of the value is the inner type parameter,
644/// i.e. the `T` in `PooledOnceReceiver<ArcPool<T>>`.
645///
646/// The outer type parameter determines the mechanism by which the endpoint is bound to the event
647/// pool. Different binding mechanisms offer different performance characteristics and resource
648/// management patterns.
649pub struct PooledOnceReceiver<P>
650where
651 P: PoolRef<<P as ReflectiveTSend>::T>,
652{
653 pool_ref: P,
654 event: Option<RawPooled<OnceEvent<P::T>>>,
655}
656
657impl<P> Debug for PooledOnceReceiver<P>
658where
659 P: PoolRef<<P as ReflectiveTSend>::T>,
660{
661 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
662 f.debug_struct("PooledOnceReceiver")
663 .field("item_type", &format_args!("{}", any::type_name::<P::T>()))
664 .field("pool_ref", &self.pool_ref)
665 .field("event", &self.event)
666 .finish()
667 }
668}
669
670impl<P> PooledOnceReceiver<P>
671where
672 P: PoolRef<<P as ReflectiveTSend>::T>,
673{
674 /// Checks whether a value is ready to be received.
675 ///
676 /// # Panics
677 ///
678 /// Panics if called after `poll()` has returned `Ready`.
679 pub fn is_ready(&self) -> bool {
680 let Some(event_handle) = &self.event else {
681 // Already pseudo-consumed the receiver as part of the Future impl.
682 panic!("receiver queried after completion");
683 };
684
685 // SAFETY: The pool remains alive through the pool_ref, satisfying the lifetime requirement
686 // for creating a reference. No exclusive references can exist because the events package
687 // only uses RawPooled<T> (shared handles), never RawPooledMut<T> (exclusive handles).
688 let event = unsafe { event_handle.as_ref() };
689
690 event.is_set()
691 }
692
693 /// Consumes the receiver and transforms it into the received value, if the value is available.
694 ///
695 /// This method provides an alternative to awaiting the receiver when you want to check for
696 /// an immediately available value without blocking. It returns `Some(value)` if a value has
697 /// already been sent, or `None` if no value is currently available.
698 ///
699 /// # Panics
700 ///
701 /// Panics if the value has already been received via `Future::poll()`.
702 ///
703 /// # Examples
704 ///
705 /// ## Basic usage with Arc-based pool
706 ///
707 /// ```rust
708 /// use std::sync::Arc;
709 ///
710 /// use events::OnceEventPool;
711 ///
712 /// let pool = Arc::new(OnceEventPool::<String>::new());
713 /// let (sender, receiver) = pool.bind_by_arc();
714 /// sender.send("Hello from pool".to_string());
715 ///
716 /// // Value is immediately available
717 /// let value = receiver.into_value();
718 /// assert_eq!(value.unwrap(), Ok("Hello from pool".to_string()));
719 /// // Event is automatically returned to pool for reuse
720 /// ```
721 ///
722 /// ## No value available
723 ///
724 /// ```rust
725 /// use std::sync::Arc;
726 ///
727 /// use events::OnceEventPool;
728 ///
729 /// let pool = Arc::new(OnceEventPool::<i32>::new());
730 /// let (_sender, receiver) = pool.bind_by_arc();
731 ///
732 /// // No value sent yet - receiver is returned back
733 /// let result = receiver.into_value();
734 /// assert!(result.is_err()); // Returns Err(receiver)
735 /// // Event is still returned to pool
736 /// ```
737 ///
738 /// ## Using with reference-based pool binding
739 ///
740 /// ```rust
741 /// use events::OnceEventPool;
742 ///
743 /// let pool = OnceEventPool::<String>::new();
744 /// let (sender, receiver) = pool.bind_by_ref();
745 /// sender.send("Hello".to_string());
746 ///
747 /// let value = receiver.into_value().unwrap();
748 /// assert_eq!(value, Ok("Hello".to_string()));
749 /// ```
750 pub fn into_value(mut self) -> Result<Result<<P as ReflectiveTSend>::T, Disconnected>, Self> {
751 let Some(event_handle) = self.event.take() else {
752 // Already pseudo-consumed the receiver as part of the Future impl.
753 return Err(self);
754 };
755
756 // SAFETY: The pool remains alive through the pool_ref, satisfying the lifetime requirement
757 // for creating a reference. No exclusive references can exist because the events package
758 // only uses RawPooled<T> (shared handles), never RawPooledMut<T> (exclusive handles).
759 let event = unsafe { event_handle.as_ref() };
760
761 // First check if the event is set (non-destructive)
762 if !event.is_set() {
763 // No value available yet and sender hasn't disconnected - return the receiver
764 self.event = Some(event_handle);
765 return Err(self);
766 }
767
768 // Event is set - use final_poll to determine if we have a value or disconnection
769 match event.final_poll() {
770 Ok(Some(value)) => {
771 // We have a value - clean up and return it
772 // SAFETY: The handle belongs to this pool (created via insert_with()) and the event
773 // state machine ensures this is the only removal path for this event instance.
774 unsafe {
775 self.pool_ref
776 .pool
777 .lock()
778 .expect(ERR_POISONED_LOCK)
779 .remove(event_handle);
780 }
781 Ok(Ok(value))
782 }
783 Ok(None) => {
784 // This shouldn't happen since is_set() returned true, but handle it gracefully
785 self.event = Some(event_handle);
786 Err(self)
787 }
788 Err(Disconnected) => {
789 // Sender disconnected - clean up and return Disconnected
790 // SAFETY: The handle belongs to this pool (created via insert_with()) and the event
791 // state machine ensures this is the only removal path for this event instance.
792 unsafe {
793 self.pool_ref
794 .pool
795 .lock()
796 .expect(ERR_POISONED_LOCK)
797 .remove(event_handle);
798 }
799 Ok(Err(Disconnected))
800 }
801 }
802 }
803
804 /// Drops the inner state, releasing the event back to the pool, returning the value (if any).
805 ///
806 /// May also be used from contexts where the receiver itself is not yet consumed.
807 fn drop_inner(&mut self) -> Option<<P as ReflectiveTSend>::T> {
808 let Some(event_handle) = self.event.take() else {
809 // Already pseudo-consumed the receiver as part of the Future impl.
810 return None;
811 };
812
813 // SAFETY: The pool remains alive through the pool_ref, satisfying the lifetime requirement
814 // for creating a reference. No exclusive references can exist because the events package
815 // only uses RawPooled<T> (shared handles), never RawPooledMut<T> (exclusive handles).
816 let event = unsafe { event_handle.as_ref() };
817
818 let final_poll_result = event.final_poll();
819
820 match final_poll_result {
821 Ok(Some(value)) => {
822 // The sender has disconnected and sent a value, so we need to clean up.
823
824 // SAFETY: The handle belongs to this pool (created via insert_with()) and the event
825 // state machine ensures this is the only removal path for this event instance.
826 unsafe {
827 self.pool_ref
828 .pool
829 .lock()
830 .expect(ERR_POISONED_LOCK)
831 .remove(event_handle);
832 }
833
834 Some(value)
835 }
836 Ok(None) => {
837 // Nothing for us to do - the sender was still connected and had not
838 // sent any value, so it will perform the cleanup on its own.
839 None
840 }
841 Err(Disconnected) => {
842 // The sender has already disconnected, so we need to clean up the event.
843
844 // SAFETY: The handle belongs to this pool (created via insert_with()) and the event
845 // state machine ensures this is the only removal path for this event instance.
846 unsafe {
847 self.pool_ref
848 .pool
849 .lock()
850 .expect(ERR_POISONED_LOCK)
851 .remove(event_handle);
852 }
853
854 None
855 }
856 }
857 }
858}
859
860impl<P> Future for PooledOnceReceiver<P>
861where
862 P: PoolRef<<P as ReflectiveTSend>::T>,
863{
864 type Output = Result<P::T, Disconnected>;
865
866 #[inline]
867 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
868 // SAFETY: We are not moving anything, just touching internal state.
869 let this = unsafe { self.get_unchecked_mut() };
870
871 let event_handle = this
872 .event
873 .expect("polling a Future after completion is invalid");
874
875 // SAFETY: The pool remains alive through the pool_ref, satisfying the lifetime requirement
876 // for creating a reference. The handle is valid as it was created from this same pool.
877 let event = unsafe { event_handle.as_ref() };
878
879 let poll_result = event.poll(cx.waker());
880
881 poll_result.map_or_else(
882 || task::Poll::Pending,
883 |value| {
884 // Any result from the inner poll means we were the last endpoint connected,
885 // so we have to clean up the event now.
886
887 // SAFETY: The handle belongs to this pool (created via insert_with()) and the event
888 // state machine ensures this is the only removal path for this event instance.
889 unsafe {
890 this.pool_ref
891 .pool
892 .lock()
893 .expect(ERR_POISONED_LOCK)
894 .remove(event_handle);
895 }
896
897 // The cleanup is already all done by poll() when it returns a result.
898 // This just ensures panic on double poll (otherwise we would violate memory safety).
899 this.event = None;
900
901 task::Poll::Ready(value)
902 },
903 )
904 }
905}
906
907impl<P> Drop for PooledOnceReceiver<P>
908where
909 P: PoolRef<<P as ReflectiveTSend>::T>,
910{
911 #[inline]
912 fn drop(&mut self) {
913 self.drop_inner();
914 }
915}
916
917// SAFETY: The NonNull marks it !Send by default but we know that everything behind the pointer
918// is thread-safe, so all is well. We also require `Send` from `R` to be extra safe here.
919unsafe impl<P> Send for PooledOnceReceiver<P> where P: PoolRef<<P as ReflectiveTSend>::T> + Send {}
920
921#[cfg(test)]
922mod tests {
923 use std::pin::pin;
924
925 use futures::task::noop_waker_ref;
926 use static_assertions::{assert_impl_all, assert_not_impl_any};
927 use testing::with_watchdog;
928
929 use super::*;
930
931 #[test]
932 fn event_pool_by_ref() {
933 with_watchdog(|| {
934 let pool = OnceEventPool::<i32>::new();
935
936 // Pool starts empty
937 assert_eq!(pool.len(), 0);
938 assert!(pool.is_empty());
939
940 let (sender, receiver) = pool.bind_by_ref();
941
942 // Pool should have 1 event while endpoints are bound
943 assert_eq!(pool.len(), 1);
944 assert!(!pool.is_empty());
945
946 // Receiver should not be ready before value is sent
947 assert!(!receiver.is_ready());
948
949 sender.send(42);
950
951 // Receiver should be ready after value is sent
952 assert!(receiver.is_ready());
953
954 let value = futures::executor::block_on(receiver).unwrap();
955 assert_eq!(value, 42);
956
957 // After endpoints are dropped, pool should be empty
958 assert_eq!(pool.len(), 0);
959 assert!(pool.is_empty());
960 });
961 }
962
963 #[test]
964 fn pool_drop_cleanup() {
965 with_watchdog(|| {
966 let pool = OnceEventPool::<i32>::new();
967
968 // Create and drop sender/receiver without using them
969 let (sender, receiver) = pool.bind_by_ref();
970 drop(sender);
971 drop(receiver);
972
973 // Pool should be empty (the event should have been cleaned up)
974 // This is implementation detail but shows the cleanup works
975 });
976 }
977
978 #[test]
979 fn pool_multiple_events() {
980 with_watchdog(|| {
981 let pool = OnceEventPool::<i32>::new();
982
983 // Test one event first
984 let (sender1, receiver1) = pool.bind_by_ref();
985 sender1.send(1);
986 let value1 = futures::executor::block_on(receiver1).unwrap();
987 assert_eq!(value1, 1);
988
989 // Test another event
990 let (sender2, receiver2) = pool.bind_by_ref();
991 sender2.send(2);
992 let value2 = futures::executor::block_on(receiver2).unwrap();
993 assert_eq!(value2, 2);
994 });
995 }
996
997 #[test]
998 fn event_pool_by_arc() {
999 with_watchdog(|| {
1000 let pool = Arc::new(OnceEventPool::<i32>::new());
1001
1002 // Pool starts empty
1003 assert_eq!(pool.len(), 0);
1004 assert!(pool.is_empty());
1005
1006 let (sender, receiver) = pool.bind_by_arc();
1007
1008 // Pool should have 1 event while endpoints are bound
1009 assert_eq!(pool.len(), 1);
1010 assert!(!pool.is_empty());
1011
1012 sender.send(42);
1013 let value = futures::executor::block_on(receiver).unwrap();
1014 assert_eq!(value, 42);
1015
1016 // After endpoints are dropped, pool should be empty
1017 assert_eq!(pool.len(), 0);
1018 assert!(pool.is_empty());
1019 });
1020 }
1021
1022 #[test]
1023 fn event_pool_by_ptr() {
1024 with_watchdog(|| {
1025 let pool = Box::pin(OnceEventPool::<i32>::new());
1026
1027 // Pool starts empty
1028 assert_eq!(pool.len(), 0);
1029 assert!(pool.is_empty());
1030
1031 // SAFETY: We ensure the pool is pinned and outlives the sender and receiver
1032 let (sender, receiver) = unsafe { pool.as_ref().bind_by_ptr() };
1033
1034 // Pool should have 1 event while endpoints are bound
1035 assert_eq!(pool.len(), 1);
1036 assert!(!pool.is_empty());
1037
1038 sender.send(42);
1039 let value = futures::executor::block_on(receiver).unwrap();
1040 assert_eq!(value, 42);
1041
1042 // After endpoints are dropped, pool should be empty
1043 assert_eq!(pool.len(), 0);
1044 assert!(pool.is_empty());
1045 });
1046 }
1047
1048 // Memory leak detection tests - these specifically test that cleanup occurs on drop
1049 #[test]
1050 fn by_ref_sender_drop_cleanup() {
1051 with_watchdog(|| {
1052 let pool = OnceEventPool::<i32>::new();
1053 {
1054 let (sender, _receiver) = pool.bind_by_ref();
1055
1056 // Force the sender to be dropped without being consumed by send()
1057 drop(sender);
1058 // Receiver will be dropped at end of scope
1059 }
1060
1061 // Create a new event to verify the pool is still functional
1062 let (sender2, receiver2) = pool.bind_by_ref();
1063 sender2.send(123);
1064 let value = futures::executor::block_on(receiver2).unwrap();
1065 assert_eq!(value, 123);
1066 });
1067 }
1068
1069 #[test]
1070 fn by_ref_receiver_drop_cleanup() {
1071 with_watchdog(|| {
1072 let pool = OnceEventPool::<i32>::new();
1073 {
1074 let (_sender, receiver) = pool.bind_by_ref();
1075
1076 // Force the receiver to be dropped without being consumed by recv()
1077 drop(receiver);
1078 // Sender will be dropped at end of scope
1079 }
1080
1081 // Create a new event to verify the pool is still functional
1082 let (sender2, receiver2) = pool.bind_by_ref();
1083 sender2.send(456);
1084 let value = futures::executor::block_on(receiver2).unwrap();
1085 assert_eq!(value, 456);
1086 });
1087 }
1088
1089 #[test]
1090 fn by_arc_sender_drop_cleanup() {
1091 with_watchdog(|| {
1092 let pool = Arc::new(OnceEventPool::<i32>::new());
1093 let (sender, _receiver) = pool.bind_by_arc();
1094
1095 // Force the sender to be dropped without being consumed by send()
1096 drop(sender);
1097
1098 // Create a new event to verify the pool is still functional
1099 let (sender2, receiver2) = pool.bind_by_arc();
1100 sender2.send(654);
1101 let value = futures::executor::block_on(receiver2).unwrap();
1102 assert_eq!(value, 654);
1103 });
1104 }
1105
1106 #[test]
1107 fn by_arc_receiver_drop_cleanup() {
1108 with_watchdog(|| {
1109 let pool = Arc::new(OnceEventPool::<i32>::new());
1110 let (_sender, receiver) = pool.bind_by_arc();
1111
1112 // Force the receiver to be dropped without being consumed by recv()
1113 drop(receiver);
1114
1115 // Create a new event to verify the pool is still functional
1116 let (sender2, receiver2) = pool.bind_by_arc();
1117 sender2.send(987);
1118 let value = futures::executor::block_on(receiver2).unwrap();
1119 assert_eq!(value, 987);
1120 });
1121 }
1122
1123 #[test]
1124 fn by_ptr_sender_drop_cleanup() {
1125 with_watchdog(|| {
1126 let pool = Box::pin(OnceEventPool::<i32>::new());
1127
1128 // SAFETY: We ensure the pool is pinned and outlives the sender and receiver
1129 let (sender, _receiver) = unsafe { pool.as_ref().bind_by_ptr() };
1130
1131 // Force the sender to be dropped without being consumed by send()
1132 drop(sender);
1133
1134 // Create a new event to verify the pool is still functional
1135 // SAFETY: We ensure the pool is pinned and outlives the sender and receiver
1136 let (sender2, receiver2) = unsafe { pool.as_ref().bind_by_ptr() };
1137 sender2.send(147);
1138 let value = futures::executor::block_on(receiver2).unwrap();
1139 assert_eq!(value, 147);
1140 });
1141 }
1142
1143 #[test]
1144 fn by_ptr_receiver_drop_cleanup() {
1145 with_watchdog(|| {
1146 let pool = Box::pin(OnceEventPool::<i32>::new());
1147
1148 // SAFETY: We ensure the pool is pinned and outlives the sender and receiver
1149 let (_sender, receiver) = unsafe { pool.as_ref().bind_by_ptr() };
1150
1151 // Force the receiver to be dropped without being consumed by recv()
1152 drop(receiver);
1153
1154 // Create a new event to verify the pool is still functional
1155 // SAFETY: We ensure the pool is pinned and outlives the sender and receiver
1156 let (sender2, receiver2) = unsafe { pool.as_ref().bind_by_ptr() };
1157 sender2.send(258);
1158 let value = futures::executor::block_on(receiver2).unwrap();
1159 assert_eq!(value, 258);
1160 });
1161 }
1162
1163 #[test]
1164 fn dec_ref_and_cleanup_is_called() {
1165 with_watchdog(|| {
1166 let pool = OnceEventPool::<i32>::new();
1167
1168 // Create multiple events and drop them without using
1169 for _ in 0..5 {
1170 let (sender, receiver) = pool.bind_by_ref();
1171 drop(sender);
1172 drop(receiver);
1173 }
1174
1175 // Verify pool still works correctly after cleanup
1176 let (sender, receiver) = pool.bind_by_ref();
1177 sender.send(999);
1178 let value = futures::executor::block_on(receiver).unwrap();
1179 assert_eq!(value, 999);
1180 });
1181 }
1182
1183 #[test]
1184 fn pool_cleanup_verified_by_capacity() {
1185 with_watchdog(|| {
1186 let pool = OnceEventPool::<i32>::new();
1187
1188 // Create many events and drop them without using - this should not grow the pool permanently
1189 for i in 0..10 {
1190 let (sender, receiver) = pool.bind_by_ref();
1191 if i % 2 == 0 {
1192 drop(sender);
1193 drop(receiver);
1194 } else {
1195 // Use some events normally
1196 sender.send(i);
1197 let _value = futures::executor::block_on(receiver);
1198 }
1199 }
1200
1201 // The pool should have cleaned up unused events
1202 // If cleanup is broken, the pool would retain all the unused events
1203 // This is a bit of an implementation detail but it's necessary to catch the leak
1204
1205 // Create one more event to verify pool still works
1206 let (sender, receiver) = pool.bind_by_ref();
1207 sender.send(42);
1208 let value = futures::executor::block_on(receiver).unwrap();
1209 assert_eq!(value, 42);
1210 });
1211 }
1212
1213 #[test]
1214 fn pool_stress_test_no_leak() {
1215 with_watchdog(|| {
1216 let pool = OnceEventPool::<u64>::new();
1217
1218 // Stress test with many dropped events
1219 for _ in 0..100 {
1220 let (sender, receiver) = pool.bind_by_ref();
1221 // Drop without using
1222 drop(sender);
1223 drop(receiver);
1224 }
1225
1226 // Pool should still work efficiently
1227 let (sender, receiver) = pool.bind_by_ref();
1228 sender.send(999);
1229 let value = futures::executor::block_on(receiver).unwrap();
1230 assert_eq!(value, 999);
1231 });
1232 }
1233
1234 #[test]
1235 fn by_ref_drop_actually_cleans_up_pool() {
1236 let pool = OnceEventPool::<u32>::new();
1237
1238 // Create many events but drop them without use
1239 for _ in 0..100 {
1240 let (_sender, _receiver) = pool.bind_by_ref();
1241 // Both sender and receiver will be dropped here
1242 }
1243
1244 // Pool should be cleaned up - all events should be removed
1245 // If Drop implementations don't work, pool will retain unused events
1246 let mut pool_guard = pool.pool.lock().unwrap();
1247 assert_eq!(
1248 pool_guard.len(),
1249 0,
1250 "Pool still contains unused events - Drop implementations not working"
1251 );
1252
1253 // An empty pool should be able to shrink to capacity 0
1254 pool_guard.shrink_to_fit();
1255 assert_eq!(
1256 pool_guard.capacity(),
1257 0,
1258 "Empty pool should shrink to capacity 0"
1259 );
1260 }
1261
1262 #[test]
1263 fn by_arc_drop_actually_cleans_up_pool() {
1264 let pool = Arc::new(OnceEventPool::<u32>::new());
1265
1266 // Create many events but drop them without use
1267 for _ in 0..100 {
1268 let (_sender, _receiver) = pool.bind_by_arc();
1269 // Both sender and receiver will be dropped here
1270 }
1271
1272 // Pool should be cleaned up - all events should be removed
1273 let mut pool_guard = pool.pool.lock().unwrap();
1274 assert_eq!(
1275 pool_guard.len(),
1276 0,
1277 "Pool still contains unused events - Drop implementations not working"
1278 );
1279
1280 // An empty pool should be able to shrink to capacity 0
1281 pool_guard.shrink_to_fit();
1282 assert_eq!(
1283 pool_guard.capacity(),
1284 0,
1285 "Empty pool should shrink to capacity 0"
1286 );
1287 }
1288
1289 #[test]
1290 fn by_ptr_drop_actually_cleans_up_pool() {
1291 // Test ptr-based pooled events cleanup by checking pool state
1292 for iteration in 0..10 {
1293 {
1294 let pool = Box::pin(OnceEventPool::<u32>::new());
1295 // SAFETY: We pin the pool for the duration of by_ptr call
1296 let (_sender, _receiver) = unsafe { pool.as_ref().bind_by_ptr() };
1297 // sender and receiver will be dropped here
1298 }
1299
1300 // For this test, we'll verify that repeated operations don't accumulate
1301 // If Drop implementations don't work, we'd see memory accumulation
1302 println!("Iteration {iteration}: Pool operations completed");
1303 }
1304 }
1305
1306 #[test]
1307 fn dec_ref_and_cleanup_actually_removes_events() {
1308 let pool = OnceEventPool::<u32>::new();
1309
1310 // Test 1: Check that events are added to pool
1311 let pool_len_before = {
1312 let pool_guard = pool.pool.lock().unwrap();
1313 pool_guard.len()
1314 };
1315
1316 // Create events in a scope to ensure they're dropped
1317 {
1318 let (sender, receiver) = pool.bind_by_ref();
1319
1320 // Events should be in pool now (don't check len while borrowed)
1321
1322 drop(sender);
1323 drop(receiver);
1324 }
1325
1326 // Now check that cleanup worked
1327 let pool_len_after = {
1328 let pool_guard = pool.pool.lock().unwrap();
1329 pool_guard.len()
1330 };
1331 assert_eq!(
1332 pool_len_after, pool_len_before,
1333 "Pool not cleaned up after dropping events - dec_ref_and_cleanup not working"
1334 );
1335 }
1336
1337 #[test]
1338 fn shrink_to_fit_with_empty_pool_shrinks_to_zero() {
1339 let pool = OnceEventPool::<u32>::new();
1340
1341 // Create and drop events without using them
1342 for _ in 0..10 {
1343 drop(pool.bind_by_ref());
1344 }
1345
1346 assert_eq!(pool.len(), 0);
1347 assert!(pool.is_empty());
1348
1349 // Shrink the pool to fit
1350 pool.shrink_to_fit();
1351
1352 assert_eq!(
1353 pool.pool.lock().unwrap().capacity(),
1354 0,
1355 "Empty pool should shrink to capacity 0"
1356 );
1357 }
1358
1359 #[test]
1360 fn event_removed_from_pool_after_endpoints_immediate_drop() {
1361 let pool = OnceEventPool::<u32>::new();
1362
1363 drop(pool.bind_by_ref());
1364
1365 assert_eq!(pool.len(), 0);
1366 assert!(pool.is_empty());
1367 }
1368
1369 #[test]
1370 fn pool_len_and_is_empty_methods() {
1371 let pool = OnceEventPool::<u32>::new();
1372
1373 // Initially empty
1374 assert_eq!(pool.len(), 0);
1375 assert!(pool.is_empty());
1376
1377 // Create first event
1378 let (sender1, receiver1) = pool.bind_by_ref();
1379 assert_eq!(pool.len(), 1);
1380 assert!(!pool.is_empty());
1381
1382 // Create second event while first is still bound
1383 let (sender2, receiver2) = pool.bind_by_ref();
1384 assert_eq!(pool.len(), 2);
1385 assert!(!pool.is_empty());
1386
1387 // Drop first event endpoints
1388 drop(sender1);
1389 drop(receiver1);
1390 assert_eq!(pool.len(), 1);
1391 assert!(!pool.is_empty());
1392
1393 // Drop second event endpoints
1394 drop(sender2);
1395 drop(receiver2);
1396 assert_eq!(pool.len(), 0);
1397 assert!(pool.is_empty());
1398 }
1399
1400 #[test]
1401 fn pooled_event_receiver_gets_disconnected_when_sender_dropped() {
1402 with_watchdog(|| {
1403 futures::executor::block_on(async {
1404 let pool = OnceEventPool::<i32>::new();
1405 let (sender, receiver) = pool.bind_by_ref();
1406
1407 // Drop the sender without sending anything
1408 drop(sender);
1409
1410 // Receiver should get a Disconnected error
1411 let result = receiver.await;
1412 assert!(matches!(result, Err(Disconnected)));
1413 assert!(matches!(result, Err(Disconnected)));
1414 });
1415 });
1416 }
1417
1418 #[test]
1419 fn pooled_event_by_arc_receiver_gets_disconnected_when_sender_dropped() {
1420 with_watchdog(|| {
1421 futures::executor::block_on(async {
1422 let pool = Arc::new(OnceEventPool::<i32>::new());
1423 let (sender, receiver) = pool.bind_by_arc();
1424
1425 // Drop the sender without sending anything
1426 drop(sender);
1427
1428 // Receiver should get a Disconnected error
1429 let result = receiver.await;
1430 assert!(matches!(result, Err(Disconnected)));
1431 assert!(matches!(result, Err(Disconnected)));
1432 });
1433 });
1434 }
1435
1436 #[test]
1437 fn pooled_event_by_ptr_receiver_gets_disconnected_when_sender_dropped() {
1438 with_watchdog(|| {
1439 futures::executor::block_on(async {
1440 let pool = Box::pin(OnceEventPool::<i32>::new());
1441
1442 // SAFETY: We ensure the pool is pinned and outlives the sender and receiver
1443 let (sender, receiver) = unsafe { pool.as_ref().bind_by_ptr() };
1444
1445 // Drop the sender without sending anything
1446 drop(sender);
1447
1448 // Receiver should get a Disconnected error
1449 let result = receiver.await;
1450 assert!(result.is_err());
1451 assert!(matches!(result, Err(Disconnected)));
1452 });
1453 });
1454 }
1455
1456 #[test]
1457 fn pooled_sender_dropped_when_awaiting_signals_disconnected() {
1458 let pool = OnceEventPool::<i32>::new();
1459 let (sender, receiver) = pool.bind_by_ref();
1460
1461 let mut receiver = pin!(receiver);
1462 let mut context = task::Context::from_waker(noop_waker_ref());
1463 assert!(matches!(
1464 receiver.as_mut().poll(&mut context),
1465 task::Poll::Pending
1466 ));
1467
1468 drop(sender);
1469
1470 let mut context = task::Context::from_waker(noop_waker_ref());
1471 assert!(matches!(
1472 receiver.as_mut().poll(&mut context),
1473 task::Poll::Ready(Err(Disconnected))
1474 ));
1475 }
1476
1477 #[cfg(debug_assertions)]
1478 #[test]
1479 fn inspect_awaiters_empty_pool() {
1480 let pool = OnceEventPool::<i32>::new();
1481
1482 let mut count = 0;
1483 pool.inspect_awaiters(|_| {
1484 count += 1;
1485 });
1486
1487 assert_eq!(count, 0);
1488 }
1489
1490 #[cfg(debug_assertions)]
1491 #[test]
1492 fn inspect_awaiters_no_awaiters() {
1493 let pool = OnceEventPool::<String>::new();
1494
1495 // Create some events but don't await them. They must still be inspected.
1496 let (_sender1, _receiver1) = pool.bind_by_ref();
1497 let (_sender2, _receiver2) = pool.bind_by_ref();
1498
1499 let mut count = 0;
1500 pool.inspect_awaiters(|_| {
1501 count += 1;
1502 });
1503
1504 assert_eq!(count, 2);
1505 }
1506
1507 #[cfg(debug_assertions)]
1508 #[test]
1509 fn inspect_awaiters_with_awaiters() {
1510 let pool = OnceEventPool::<i32>::new();
1511
1512 // Create events and start awaiting them
1513 let (_sender1, receiver1) = pool.bind_by_ref();
1514 let (_sender2, receiver2) = pool.bind_by_ref();
1515
1516 let mut context = task::Context::from_waker(noop_waker_ref());
1517 let mut pinned_receiver1 = pin!(receiver1);
1518 let mut pinned_receiver2 = pin!(receiver2);
1519
1520 // Poll both receivers to create awaiters
1521 let _poll1 = pinned_receiver1.as_mut().poll(&mut context);
1522 let _poll2 = pinned_receiver2.as_mut().poll(&mut context);
1523
1524 let mut count = 0;
1525 pool.inspect_awaiters(|_backtrace| {
1526 count += 1;
1527 });
1528
1529 assert_eq!(count, 2);
1530 }
1531
1532 #[test]
1533 fn pooled_receiver_into_value_with_sent_value() {
1534 with_watchdog(|| {
1535 let pool = Arc::new(OnceEventPool::<String>::new());
1536 let (sender, receiver) = pool.bind_by_arc();
1537 sender.send("test value".to_string());
1538
1539 let result = receiver.into_value();
1540 assert_eq!(result.unwrap(), Ok("test value".to_string()));
1541 });
1542 }
1543
1544 #[test]
1545 fn pooled_receiver_into_value_no_value_sent() {
1546 with_watchdog(|| {
1547 let pool = Arc::new(OnceEventPool::<i32>::new());
1548 let (_sender, receiver) = pool.bind_by_arc();
1549
1550 let result = receiver.into_value();
1551 match result {
1552 Err(_) => {} // Expected - receiver returned
1553 _ => panic!("Expected NotReady error when sender not ready"),
1554 }
1555 });
1556 }
1557
1558 #[test]
1559 fn pooled_receiver_into_value_sender_disconnected() {
1560 with_watchdog(|| {
1561 let pool = Arc::new(OnceEventPool::<String>::new());
1562 let (sender, receiver) = pool.bind_by_arc();
1563 drop(sender); // Disconnect without sending
1564
1565 let result = receiver.into_value();
1566 match result {
1567 Ok(Err(Disconnected)) => {} // Expected - disconnected
1568 _ => panic!("Expected Ok(Err(Disconnected)) when sender disconnected"),
1569 }
1570 });
1571 }
1572
1573 #[test]
1574 fn pooled_receiver_into_value_with_ref_pool() {
1575 with_watchdog(|| {
1576 let pool = OnceEventPool::<i32>::new();
1577 let (sender, receiver) = pool.bind_by_ref();
1578 sender.send(42);
1579
1580 let result = receiver.into_value();
1581 assert_eq!(result.unwrap(), Ok(42));
1582 });
1583 }
1584
1585 #[test]
1586 fn pooled_receiver_into_value_with_arc_pool() {
1587 with_watchdog(|| {
1588 let pool = Arc::new(OnceEventPool::<String>::new());
1589 let (sender, receiver) = pool.bind_by_arc();
1590 sender.send("arc test".to_string());
1591
1592 let result = receiver.into_value();
1593 assert_eq!(result.unwrap(), Ok("arc test".to_string()));
1594 });
1595 }
1596
1597 #[test]
1598 fn pooled_receiver_into_value_with_ptr_pool() {
1599 with_watchdog(|| {
1600 let pool = Box::pin(OnceEventPool::<i32>::new());
1601 // SAFETY: Pool is pinned and outlives the sender/receiver.
1602 let (sender, receiver) = unsafe { pool.as_ref().bind_by_ptr() };
1603 sender.send(999);
1604
1605 let result = receiver.into_value();
1606 assert_eq!(result.unwrap(), Ok(999));
1607 });
1608 }
1609
1610 #[test]
1611 fn pooled_receiver_into_value_returns_none_after_poll() {
1612 with_watchdog(|| {
1613 futures::executor::block_on(async {
1614 let pool = Arc::new(OnceEventPool::<i32>::new());
1615 let (sender, mut receiver) = pool.bind_by_arc();
1616 sender.send(42);
1617
1618 // Poll the receiver first
1619 let waker = noop_waker_ref();
1620 let mut context = task::Context::from_waker(waker);
1621 let poll_result = Pin::new(&mut receiver).poll(&mut context);
1622 assert_eq!(poll_result, task::Poll::Ready(Ok(42)));
1623
1624 // This should return None since the receiver was already consumed
1625 let value = receiver.into_value();
1626 match value {
1627 Err(_) => {} // Expected - receiver returned after consumption
1628 _ => panic!("Expected NotReady error after receiver consumption"),
1629 }
1630 });
1631 });
1632 }
1633
1634 #[test]
1635 fn pooled_receiver_into_value_pool_reuse() {
1636 with_watchdog(|| {
1637 let pool = Arc::new(OnceEventPool::<i32>::new());
1638
1639 // First usage
1640 let (sender1, receiver1) = pool.bind_by_arc();
1641 sender1.send(123);
1642 let result1 = receiver1.into_value();
1643 assert_eq!(result1.unwrap(), Ok(123));
1644
1645 // Second usage - should reuse the event from the pool
1646 let (sender2, receiver2) = pool.bind_by_arc();
1647 sender2.send(456);
1648 let result2 = receiver2.into_value();
1649 assert_eq!(result2.unwrap(), Ok(456));
1650 });
1651 }
1652
1653 #[test]
1654 fn pooled_receiver_into_value_multiple_event_types() {
1655 with_watchdog(|| {
1656 // Test with different value types
1657 let pool1 = Arc::new(OnceEventPool::<()>::new());
1658 let (sender1, receiver1) = pool1.bind_by_arc();
1659 sender1.send(());
1660 assert_eq!(receiver1.into_value().unwrap(), Ok(()));
1661
1662 let pool2 = Arc::new(OnceEventPool::<Vec<i32>>::new());
1663 let (sender2, receiver2) = pool2.bind_by_arc();
1664 sender2.send(vec![1, 2, 3]);
1665 assert_eq!(receiver2.into_value().unwrap(), Ok(vec![1, 2, 3]));
1666
1667 let pool3 = Arc::new(OnceEventPool::<Option<String>>::new());
1668 let (sender3, receiver3) = pool3.bind_by_arc();
1669 sender3.send(Some("nested option".to_string()));
1670 assert_eq!(
1671 receiver3.into_value().unwrap(),
1672 Ok(Some("nested option".to_string()))
1673 );
1674 });
1675 }
1676
1677 #[test]
1678 fn thread_safety() {
1679 // The pool is accessed across threads, so requires Sync as well as Send.
1680 assert_impl_all!(OnceEventPool<u32>: Send, Sync);
1681
1682 // These are all meant to be consumed locally - they may move between threads but are
1683 // not shared between threads, so Sync is not expected, only Send.
1684 assert_impl_all!(PooledOnceSender<RefPool<'static, u32>>: Send);
1685 assert_impl_all!(PooledOnceReceiver<RefPool<'static, u32>>: Send);
1686 assert_impl_all!(PooledOnceSender<ArcPool<u32>>: Send);
1687 assert_impl_all!(PooledOnceReceiver<ArcPool<u32>>: Send);
1688 assert_impl_all!(PooledOnceSender<PtrPool<u32>>: Send);
1689 assert_impl_all!(PooledOnceReceiver<PtrPool<u32>>: Send);
1690 assert_not_impl_any!(PooledOnceSender<RefPool<'static, u32>>: Sync);
1691 assert_not_impl_any!(PooledOnceReceiver<RefPool<'static, u32>>: Sync);
1692 assert_not_impl_any!(PooledOnceSender<ArcPool<u32>>: Sync);
1693 assert_not_impl_any!(PooledOnceReceiver<ArcPool<u32>>: Sync);
1694 assert_not_impl_any!(PooledOnceSender<PtrPool<u32>>: Sync);
1695 assert_not_impl_any!(PooledOnceReceiver<PtrPool<u32>>: Sync);
1696 }
1697}