events_once/pool/
raw_sync.rs

1use std::any::type_name;
2#[cfg(debug_assertions)]
3use std::backtrace::Backtrace;
4use std::cell::UnsafeCell;
5use std::fmt;
6use std::marker::PhantomData;
7use std::mem::MaybeUninit;
8use std::pin::Pin;
9use std::ptr::NonNull;
10
11use infinity_pool::RawPinnedPool;
12use parking_lot::Mutex;
13
14use crate::{Event, RawPooledReceiver, RawPooledRef, RawPooledSender, ReceiverCore, SenderCore};
15
16/// A pool of reusable thread-safe one-time events with manual pool lifecycle management.
17///
18/// # Examples
19///
20/// ```
21/// use events_once::RawEventPool;
22///
23/// # #[tokio::main]
24/// # async fn main() {
25/// let pool = Box::pin(RawEventPool::<String>::new());
26///
27/// for i in 0..3 {
28///     // SAFETY: We promise the pool outlives both the returned endpoints.
29///     let (tx, rx) = unsafe { pool.as_ref().rent() };
30///
31///     tx.send(format!("Message {i}"));
32///
33///     let message = rx.await.unwrap();
34///     println!("{message}");
35/// }
36/// # }
37/// ```
38pub struct RawEventPool<T: Send + 'static> {
39    // This is in an UnsafeCell to logically "detach" it from the parent object.
40    // We will create direct (shared) references to the contents of the cell not only from
41    // the pool but also from the event references themselves. This is safe as long as
42    // we never create conflicting references. We could not guarantee that for the parent
43    // object but we can guarantee it for the cell contents.
44    core: NonNull<UnsafeCell<RawEventPoolCore<T>>>,
45
46    _owns_some: PhantomData<T>,
47}
48
49#[cfg_attr(coverage_nightly, coverage(off))] // No API contract to test.
50impl<T: Send + 'static> fmt::Debug for RawEventPool<T> {
51    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
52        f.debug_struct(type_name::<Self>())
53            .field("core", &self.core)
54            .finish()
55    }
56}
57
58impl<T: Send + 'static> Drop for RawEventPool<T> {
59    #[cfg_attr(test, mutants::skip)] // Impractical to test deallocation - Miri will complain if we leak.
60    fn drop(&mut self) {
61        // SAFETY: We are the owner of the core, so we know it remains valid.
62        // Anyone calling rent() has to promise that we outlive the rented event
63        // which means that we must be the last remaining user of the core.
64        drop(unsafe { Box::from_raw(self.core.as_ptr()) });
65    }
66}
67
68pub(crate) struct RawEventPoolCore<T: Send + 'static> {
69    pub(crate) pool: Mutex<RawPinnedPool<UnsafeCell<MaybeUninit<Event<T>>>>>,
70}
71
72#[cfg_attr(coverage_nightly, coverage(off))] // No API contract to test.
73impl<T: Send + 'static> fmt::Debug for RawEventPoolCore<T> {
74    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75        f.debug_struct(type_name::<Self>())
76            .field("pool", &self.pool)
77            .finish()
78    }
79}
80
81impl<T: Send + 'static> RawEventPool<T> {
82    /// Creates a new empty event pool.
83    #[must_use]
84    pub fn new() -> Self {
85        let core = RawEventPoolCore {
86            pool: Mutex::new(RawPinnedPool::new()),
87        };
88
89        let core_ptr = Box::into_raw(Box::new(UnsafeCell::new(core)));
90
91        Self {
92            // SAFETY: Boxed object is never null.
93            core: unsafe { NonNull::new_unchecked(core_ptr) },
94            _owns_some: PhantomData,
95        }
96    }
97
98    /// Rents an event from the pool, returning its endpoints.
99    ///
100    /// The event will be returned to the pool when both endpoints are dropped.
101    ///
102    /// # Safety
103    ///
104    /// The caller must guarantee that the pool outlives the endpoints.
105    #[must_use]
106    #[cfg_attr(test, mutants::skip)] // Cargo-mutants tries a boatload of unviable mutations and wastes time on this.
107    pub unsafe fn rent(self: Pin<&Self>) -> (RawPooledSender<T>, RawPooledReceiver<T>) {
108        let storage = {
109            // SAFETY: We are the owner of the core, so we know it remains valid. We only ever
110            // create shared references to it, so no conflicting exclusive references can exist.
111            let core_cell = unsafe { self.core.as_ref() };
112
113            // SAFETY: See above.
114            let core_maybe = unsafe { core_cell.get().as_ref() };
115
116            // SAFETY: UnsafeCell pointer is never null.
117            let core = unsafe { core_maybe.unwrap_unchecked() };
118
119            let mut pool = core.pool.lock();
120
121            // SAFETY: We are required to initialize the storage of the item we store in the pool.
122            // We do - that is what new_in_inner is for.
123            #[expect(
124                clippy::multiple_unsafe_ops_per_block,
125                unused_unsafe,
126                reason = "it cannot handle the closure"
127            )]
128            unsafe {
129                pool.insert_with(|place| {
130                    // This is a sandwich of MaybeUninit<UnsafeCell<MaybeUninit<Event<T>>>>.
131                    // The outer MaybeUninit is for the pool to manage uninitialized storage.
132                    // It does not know that we are expecting to use the internal MaybeUninit
133                    // instead (which we want to do to preserve the UnsafeCell around everything).
134                    //
135                    // SAFETY: We still treat it as uninitialized due to the inner MaybeUninit.
136                    let place = unsafe { place.assume_init_mut() };
137
138                    Event::new_in_inner(place);
139                })
140            }
141        }
142        .into_shared();
143
144        let event_ref = RawPooledRef::new(self.core, storage);
145
146        let inner_sender = SenderCore::new(event_ref.clone());
147        let inner_receiver = ReceiverCore::new(event_ref);
148
149        (
150            RawPooledSender::new(inner_sender),
151            RawPooledReceiver::new(inner_receiver),
152        )
153    }
154
155    /// Returns `true` if no events have currently been rented from the pool.
156    #[must_use]
157    pub fn is_empty(&self) -> bool {
158        // SAFETY: We are the owner of the core, so we know it remains valid. We only ever
159        // create shared references to it, so no conflicting exclusive references can exist.
160        let core_cell = unsafe { self.core.as_ref() };
161
162        // SAFETY: See above.
163        let core_maybe = unsafe { core_cell.get().as_ref() };
164
165        // SAFETY: UnsafeCell pointer is never null.
166        let core = unsafe { core_maybe.unwrap_unchecked() };
167
168        let pool = core.pool.lock();
169
170        pool.is_empty()
171    }
172
173    /// Returns the number of events that have currently been rented from the pool.
174    #[must_use]
175    pub fn len(&self) -> usize {
176        // SAFETY: We are the owner of the core, so we know it remains valid. We only ever
177        // create shared references to it, so no conflicting exclusive references can exist.
178        let core_cell = unsafe { self.core.as_ref() };
179
180        // SAFETY: See above.
181        let core_maybe = unsafe { core_cell.get().as_ref() };
182
183        // SAFETY: UnsafeCell pointer is never null.
184        let core = unsafe { core_maybe.unwrap_unchecked() };
185
186        let pool = core.pool.lock();
187
188        pool.len()
189    }
190
191    /// Uses the provided closure to inspect the backtraces of the most recent awaiter of each
192    /// awaited event in the pool.
193    ///
194    /// This method is only available in debug builds (`cfg(debug_assertions)`).
195    /// For any data to be present, `RUST_BACKTRACE=1` or `RUST_LIB_BACKTRACE=1` must be set.
196    ///
197    /// The closure is called once for each event in the pool that has been awaited at some point
198    /// in the past.
199    #[cfg(debug_assertions)]
200    pub fn inspect_awaiters(&self, mut f: impl FnMut(&Backtrace)) {
201        // SAFETY: We are the owner of the core, so we know it remains valid. We only ever
202        // create shared references to it, so no conflicting exclusive references can exist.
203        let core_cell = unsafe { self.core.as_ref() };
204
205        // SAFETY: See above.
206        let core_maybe = unsafe { core_cell.get().as_ref() };
207
208        // SAFETY: UnsafeCell pointer is never null.
209        let core = unsafe { core_maybe.unwrap_unchecked() };
210
211        let pool = core.pool.lock();
212
213        for event_ptr in pool.iter() {
214            // SAFETY: The pool remains alive for the duration of this function call, satisfying
215            // the lifetime requirement. The pointer is valid as it comes from the pool's iterator.
216            // We only ever create shared references to the events, so no conflicting exclusive
217            // references can exist.
218            let event_cell = unsafe { event_ptr.as_ref() };
219
220            // SAFETY: See above.
221            let event_maybe = unsafe { event_cell.get().as_ref() };
222
223            // SAFETY: UnsafeCell pointer is never null.
224            let event = unsafe { event_maybe.unwrap_unchecked() };
225
226            // SAFETY: We only ever create shared references, never exclusive ones.
227            let event = unsafe { event.assume_init_ref() };
228
229            event.inspect_awaiter(|bt| {
230                if let Some(bt) = bt {
231                    f(bt);
232                }
233            });
234        }
235    }
236}
237
238impl<T: Send + 'static> Default for RawEventPool<T> {
239    fn default() -> Self {
240        Self::new()
241    }
242}
243
244// SAFETY: The pool is thread-safe - the only reason it does not have it via auto traits is that
245// we have the NonNull pointer that disables thread safety auto traits. However, all the logic is
246// actually protected via the core Mutex, so all is well.
247unsafe impl<T: Send + 'static> Send for RawEventPool<T> {}
248// SAFETY: See above.
249unsafe impl<T: Send + 'static> Sync for RawEventPool<T> {}
250
251#[cfg(test)]
252#[allow(clippy::undocumented_unsafe_blocks, reason = "test code, be concise")]
253#[cfg_attr(coverage_nightly, coverage(off))]
254mod tests {
255    use std::pin::pin;
256    use std::sync::{Arc, Barrier};
257    use std::task::{self, Poll, Waker};
258    use std::{iter, thread};
259
260    use spin_on::spin_on;
261    use static_assertions::assert_impl_all;
262
263    use super::*;
264    use crate::Disconnected;
265
266    assert_impl_all!(RawEventPool<u32>: Send, Sync);
267
268    #[test]
269    fn len() {
270        let pool = pin!(RawEventPool::<i32>::new());
271
272        assert_eq!(pool.len(), 0);
273
274        let (sender1, receiver1) = unsafe { pool.as_ref().rent() };
275        assert_eq!(pool.len(), 1);
276
277        let (sender2, receiver2) = unsafe { pool.as_ref().rent() };
278        assert_eq!(pool.len(), 2);
279
280        drop(sender1);
281        drop(receiver1);
282        assert_eq!(pool.len(), 1);
283
284        drop(sender2);
285        drop(receiver2);
286        assert_eq!(pool.len(), 0);
287    }
288
289    #[test]
290    fn send_receive() {
291        let pool = pin!(RawEventPool::<i32>::new());
292
293        assert!(pool.is_empty());
294
295        let (sender, receiver) = unsafe { pool.as_ref().rent() };
296
297        assert!(!pool.is_empty());
298
299        {
300            let mut receiver = pin!(receiver);
301
302            sender.send(42);
303
304            let mut cx = task::Context::from_waker(Waker::noop());
305
306            let poll_result = receiver.as_mut().poll(&mut cx);
307            assert!(matches!(poll_result, Poll::Ready(Ok(42))));
308        }
309
310        assert!(pool.is_empty());
311    }
312
313    #[test]
314    fn send_receive_reused() {
315        const ITERATIONS: usize = 32;
316
317        let pool = pin!(RawEventPool::<i32>::new());
318
319        assert!(pool.is_empty());
320
321        for _ in 0..ITERATIONS {
322            let (sender, receiver) = unsafe { pool.as_ref().rent() };
323            let mut receiver = pin!(receiver);
324
325            sender.send(42);
326
327            let mut cx = task::Context::from_waker(Waker::noop());
328
329            let poll_result = receiver.as_mut().poll(&mut cx);
330            assert!(matches!(poll_result, Poll::Ready(Ok(42))));
331        }
332
333        assert!(pool.is_empty());
334    }
335
336    #[test]
337    fn send_receive_reused_batches() {
338        const ITERATIONS: usize = 4;
339        const BATCH_SIZE: usize = 8;
340
341        let pool = pin!(RawEventPool::<i32>::new());
342
343        for _ in 0..ITERATIONS {
344            let endpoints = iter::repeat_with(|| unsafe { pool.as_ref().rent() })
345                .take(BATCH_SIZE)
346                .collect::<Vec<_>>();
347
348            for (sender, receiver) in endpoints {
349                let mut receiver = pin!(receiver);
350
351                sender.send(42);
352
353                let mut cx = task::Context::from_waker(Waker::noop());
354
355                let poll_result = receiver.as_mut().poll(&mut cx);
356                assert!(matches!(poll_result, Poll::Ready(Ok(42))));
357            }
358        }
359    }
360
361    #[test]
362    fn drop_send() {
363        let pool = pin!(RawEventPool::<i32>::new());
364
365        let (sender, _) = unsafe { pool.as_ref().rent() };
366
367        sender.send(42);
368    }
369
370    #[test]
371    fn drop_receive() {
372        let pool = pin!(RawEventPool::<i32>::new());
373
374        let (_, receiver) = unsafe { pool.as_ref().rent() };
375        let mut receiver = pin!(receiver);
376
377        let mut cx = task::Context::from_waker(Waker::noop());
378
379        let poll_result = receiver.as_mut().poll(&mut cx);
380        assert!(matches!(poll_result, Poll::Ready(Err(Disconnected))));
381    }
382
383    #[test]
384    fn receive_drop_receive() {
385        let pool = pin!(RawEventPool::<i32>::new());
386
387        let (sender, receiver) = unsafe { pool.as_ref().rent() };
388        let mut receiver = pin!(receiver);
389
390        let mut cx = task::Context::from_waker(Waker::noop());
391
392        let poll_result = receiver.as_mut().poll(&mut cx);
393        assert!(matches!(poll_result, Poll::Pending));
394
395        drop(sender);
396
397        let poll_result = receiver.as_mut().poll(&mut cx);
398        assert!(matches!(poll_result, Poll::Ready(Err(Disconnected))));
399    }
400
401    #[test]
402    fn receive_drop_send() {
403        let pool = pin!(RawEventPool::<i32>::new());
404
405        let (sender, receiver) = unsafe { pool.as_ref().rent() };
406        let mut receiver = Box::pin(receiver);
407
408        let mut cx = task::Context::from_waker(Waker::noop());
409
410        let poll_result = receiver.as_mut().poll(&mut cx);
411        assert!(matches!(poll_result, Poll::Pending));
412
413        drop(receiver);
414
415        sender.send(42);
416    }
417
418    #[test]
419    fn receive_drop_drop_receiver_first() {
420        let pool = pin!(RawEventPool::<i32>::new());
421
422        let (sender, receiver) = unsafe { pool.as_ref().rent() };
423        let mut receiver = Box::pin(receiver);
424
425        let mut cx = task::Context::from_waker(Waker::noop());
426
427        let poll_result = receiver.as_mut().poll(&mut cx);
428        assert!(matches!(poll_result, Poll::Pending));
429
430        drop(receiver);
431        drop(sender);
432    }
433
434    #[test]
435    fn receive_drop_drop_sender_first() {
436        let pool = pin!(RawEventPool::<i32>::new());
437
438        let (sender, receiver) = unsafe { pool.as_ref().rent() };
439        let mut receiver = Box::pin(receiver);
440
441        let mut cx = task::Context::from_waker(Waker::noop());
442
443        let poll_result = receiver.as_mut().poll(&mut cx);
444        assert!(matches!(poll_result, Poll::Pending));
445
446        drop(sender);
447        drop(receiver);
448    }
449
450    #[test]
451    fn drop_drop_receiver_first() {
452        let pool = pin!(RawEventPool::<i32>::new());
453
454        let (sender, receiver) = unsafe { pool.as_ref().rent() };
455
456        drop(receiver);
457        drop(sender);
458    }
459
460    #[test]
461    fn drop_drop_sender_first() {
462        let pool = pin!(RawEventPool::<i32>::new());
463
464        let (sender, receiver) = unsafe { pool.as_ref().rent() };
465
466        drop(sender);
467        drop(receiver);
468    }
469
470    #[test]
471    fn is_ready() {
472        let pool = pin!(RawEventPool::<i32>::new());
473
474        let (sender, receiver) = unsafe { pool.as_ref().rent() };
475        let mut receiver = pin!(receiver);
476
477        assert!(!receiver.is_ready());
478
479        sender.send(42);
480
481        assert!(receiver.is_ready());
482
483        let mut cx = task::Context::from_waker(Waker::noop());
484
485        let poll_result = receiver.as_mut().poll(&mut cx);
486        assert!(matches!(poll_result, Poll::Ready(Ok(42))));
487    }
488
489    #[test]
490    fn drop_is_ready() {
491        let pool = pin!(RawEventPool::<i32>::new());
492
493        let (sender, receiver) = unsafe { pool.as_ref().rent() };
494        let mut receiver = pin!(receiver);
495
496        assert!(!receiver.is_ready());
497
498        drop(sender);
499
500        assert!(receiver.is_ready());
501
502        let mut cx = task::Context::from_waker(Waker::noop());
503
504        let poll_result = receiver.as_mut().poll(&mut cx);
505        assert!(matches!(poll_result, Poll::Ready(Err(Disconnected))));
506    }
507
508    #[test]
509    fn into_value() {
510        let pool = pin!(RawEventPool::<i32>::new());
511
512        let (sender, receiver) = unsafe { pool.as_ref().rent() };
513
514        let Err(crate::IntoValueError::Pending(receiver)) = receiver.into_value() else {
515            panic!("Expected receiver to not be ready");
516        };
517
518        sender.send(42);
519
520        assert!(matches!(receiver.into_value(), Ok(42)));
521    }
522
523    #[test]
524    #[should_panic]
525    fn panic_poll_after_completion() {
526        let pool = pin!(RawEventPool::<i32>::new());
527
528        let (sender, receiver) = unsafe { pool.as_ref().rent() };
529        let mut receiver = pin!(receiver);
530
531        sender.send(42);
532
533        let mut cx = task::Context::from_waker(Waker::noop());
534
535        assert!(matches!(
536            receiver.as_mut().poll(&mut cx),
537            Poll::Ready(Ok(42))
538        ));
539
540        _ = receiver.as_mut().poll(&mut cx);
541    }
542
543    #[test]
544    #[should_panic]
545    fn panic_is_ready_after_completion() {
546        let pool = pin!(RawEventPool::<i32>::new());
547
548        let (sender, receiver) = unsafe { pool.as_ref().rent() };
549        let mut receiver = pin!(receiver);
550
551        sender.send(42);
552
553        let mut cx = task::Context::from_waker(Waker::noop());
554
555        assert!(matches!(
556            receiver.as_mut().poll(&mut cx),
557            Poll::Ready(Ok(42))
558        ));
559
560        _ = receiver.is_ready();
561    }
562
563    #[test]
564    fn send_receive_mt() {
565        let pool = pin!(RawEventPool::<i32>::new());
566
567        let (sender, receiver) = unsafe { pool.as_ref().rent() };
568
569        thread::spawn(move || {
570            sender.send(42);
571        })
572        .join()
573        .unwrap();
574
575        thread::spawn(move || {
576            let mut receiver = pin!(receiver);
577            let mut cx = task::Context::from_waker(Waker::noop());
578
579            let poll_result = receiver.as_mut().poll(&mut cx);
580            assert!(matches!(poll_result, Poll::Ready(Ok(42))));
581        })
582        .join()
583        .unwrap();
584    }
585
586    #[test]
587    fn receive_send_receive_mt() {
588        let pool = pin!(RawEventPool::<i32>::new());
589
590        let (sender, receiver) = unsafe { pool.as_ref().rent() };
591
592        let first_poll_completed = Arc::new(Barrier::new(2));
593        let first_poll_completed_clone = Arc::clone(&first_poll_completed);
594
595        let send_thread = thread::spawn(move || {
596            first_poll_completed.wait();
597
598            sender.send(42);
599        });
600
601        let receive_thread = thread::spawn(move || {
602            let mut receiver = pin!(receiver);
603            let mut cx = task::Context::from_waker(Waker::noop());
604
605            let poll_result = receiver.as_mut().poll(&mut cx);
606            assert!(matches!(poll_result, Poll::Pending));
607
608            first_poll_completed_clone.wait();
609
610            // We do not know how many polls this will take, so we switch into real async.
611            spin_on(async {
612                let result = &mut receiver.await;
613                assert!(matches!(result, Ok(42)));
614            });
615        });
616
617        send_thread.join().unwrap();
618        receive_thread.join().unwrap();
619    }
620
621    #[test]
622    fn send_receive_unbiased_mt() {
623        let pool = pin!(RawEventPool::<i32>::new());
624
625        let (sender, receiver) = unsafe { pool.as_ref().rent() };
626
627        let receive_thread = thread::spawn(move || {
628            spin_on(async {
629                let result = &mut receiver.await;
630                assert!(matches!(result, Ok(42)));
631            });
632        });
633
634        let send_thread = thread::spawn(move || {
635            sender.send(42);
636        });
637
638        send_thread.join().unwrap();
639        receive_thread.join().unwrap();
640    }
641
642    #[test]
643    fn drop_receive_unbiased_mt() {
644        let pool = pin!(RawEventPool::<i32>::new());
645
646        let (sender, receiver) = unsafe { pool.as_ref().rent() };
647
648        let receive_thread = thread::spawn(move || {
649            spin_on(async {
650                let result = &mut receiver.await;
651                assert!(matches!(result, Err(Disconnected)));
652            });
653        });
654
655        let send_thread = thread::spawn(move || {
656            drop(sender);
657        });
658
659        send_thread.join().unwrap();
660        receive_thread.join().unwrap();
661    }
662
663    #[test]
664    fn drop_send_unbiased_mt() {
665        let pool = pin!(RawEventPool::<i32>::new());
666
667        let (sender, receiver) = unsafe { pool.as_ref().rent() };
668
669        let receive_thread = thread::spawn(move || {
670            drop(receiver);
671        });
672
673        let send_thread = thread::spawn(move || {
674            sender.send(42);
675        });
676
677        send_thread.join().unwrap();
678        receive_thread.join().unwrap();
679    }
680
681    #[cfg(debug_assertions)]
682    #[test]
683    fn inspect_awaiters_inspects_only_awaited() {
684        let pool = pin!(RawEventPool::<i32>::new());
685
686        let (_sender1, receiver1) = unsafe { pool.as_ref().rent() };
687        let (sender2, receiver2) = unsafe { pool.as_ref().rent() };
688        let (_sender3, _receiver3) = unsafe { pool.as_ref().rent() };
689
690        let mut receiver1 = pin!(receiver1);
691        let mut receiver2 = Box::pin(receiver2);
692
693        let mut cx = task::Context::from_waker(Waker::noop());
694        _ = receiver1.as_mut().poll(&mut cx);
695        _ = receiver2.as_mut().poll(&mut cx);
696
697        let mut inspected_count = 0;
698
699        pool.inspect_awaiters(|_bt| {
700            inspected_count += 1;
701        });
702
703        assert_eq!(inspected_count, 2);
704
705        drop(sender2);
706        drop(receiver2);
707
708        let mut inspected_count = 0;
709
710        pool.inspect_awaiters(|_bt| {
711            inspected_count += 1;
712        });
713
714        assert_eq!(inspected_count, 1);
715    }
716
717    #[test]
718    fn default_creates_functional_pool() {
719        let pool = pin!(RawEventPool::<i32>::default());
720
721        assert!(pool.is_empty());
722
723        let (sender, receiver) = unsafe { pool.as_ref().rent() };
724        let mut receiver = pin!(receiver);
725
726        sender.send(42);
727
728        let mut cx = task::Context::from_waker(Waker::noop());
729
730        let poll_result = receiver.as_mut().poll(&mut cx);
731        assert!(matches!(poll_result, Poll::Ready(Ok(42))));
732    }
733}