1use std::any::type_name;
2#[cfg(debug_assertions)]
3use std::backtrace::Backtrace;
4use std::cell::UnsafeCell;
5use std::fmt;
6use std::marker::PhantomData;
7use std::mem::MaybeUninit;
8use std::pin::Pin;
9use std::ptr::NonNull;
10
11use infinity_pool::RawPinnedPool;
12use parking_lot::Mutex;
13
14use crate::{Event, RawPooledReceiver, RawPooledRef, RawPooledSender, ReceiverCore, SenderCore};
15
16pub struct RawEventPool<T: Send + 'static> {
39 core: NonNull<UnsafeCell<RawEventPoolCore<T>>>,
45
46 _owns_some: PhantomData<T>,
47}
48
49#[cfg_attr(coverage_nightly, coverage(off))] impl<T: Send + 'static> fmt::Debug for RawEventPool<T> {
51 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
52 f.debug_struct(type_name::<Self>())
53 .field("core", &self.core)
54 .finish()
55 }
56}
57
58impl<T: Send + 'static> Drop for RawEventPool<T> {
59 #[cfg_attr(test, mutants::skip)] fn drop(&mut self) {
61 drop(unsafe { Box::from_raw(self.core.as_ptr()) });
65 }
66}
67
68pub(crate) struct RawEventPoolCore<T: Send + 'static> {
69 pub(crate) pool: Mutex<RawPinnedPool<UnsafeCell<MaybeUninit<Event<T>>>>>,
70}
71
72#[cfg_attr(coverage_nightly, coverage(off))] impl<T: Send + 'static> fmt::Debug for RawEventPoolCore<T> {
74 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75 f.debug_struct(type_name::<Self>())
76 .field("pool", &self.pool)
77 .finish()
78 }
79}
80
81impl<T: Send + 'static> RawEventPool<T> {
82 #[must_use]
84 pub fn new() -> Self {
85 let core = RawEventPoolCore {
86 pool: Mutex::new(RawPinnedPool::new()),
87 };
88
89 let core_ptr = Box::into_raw(Box::new(UnsafeCell::new(core)));
90
91 Self {
92 core: unsafe { NonNull::new_unchecked(core_ptr) },
94 _owns_some: PhantomData,
95 }
96 }
97
98 #[must_use]
106 #[cfg_attr(test, mutants::skip)] pub unsafe fn rent(self: Pin<&Self>) -> (RawPooledSender<T>, RawPooledReceiver<T>) {
108 let storage = {
109 let core_cell = unsafe { self.core.as_ref() };
112
113 let core_maybe = unsafe { core_cell.get().as_ref() };
115
116 let core = unsafe { core_maybe.unwrap_unchecked() };
118
119 let mut pool = core.pool.lock();
120
121 #[expect(
124 clippy::multiple_unsafe_ops_per_block,
125 unused_unsafe,
126 reason = "it cannot handle the closure"
127 )]
128 unsafe {
129 pool.insert_with(|place| {
130 let place = unsafe { place.assume_init_mut() };
137
138 Event::new_in_inner(place);
139 })
140 }
141 }
142 .into_shared();
143
144 let event_ref = RawPooledRef::new(self.core, storage);
145
146 let inner_sender = SenderCore::new(event_ref.clone());
147 let inner_receiver = ReceiverCore::new(event_ref);
148
149 (
150 RawPooledSender::new(inner_sender),
151 RawPooledReceiver::new(inner_receiver),
152 )
153 }
154
155 #[must_use]
157 pub fn is_empty(&self) -> bool {
158 let core_cell = unsafe { self.core.as_ref() };
161
162 let core_maybe = unsafe { core_cell.get().as_ref() };
164
165 let core = unsafe { core_maybe.unwrap_unchecked() };
167
168 let pool = core.pool.lock();
169
170 pool.is_empty()
171 }
172
173 #[must_use]
175 pub fn len(&self) -> usize {
176 let core_cell = unsafe { self.core.as_ref() };
179
180 let core_maybe = unsafe { core_cell.get().as_ref() };
182
183 let core = unsafe { core_maybe.unwrap_unchecked() };
185
186 let pool = core.pool.lock();
187
188 pool.len()
189 }
190
191 #[cfg(debug_assertions)]
200 pub fn inspect_awaiters(&self, mut f: impl FnMut(&Backtrace)) {
201 let core_cell = unsafe { self.core.as_ref() };
204
205 let core_maybe = unsafe { core_cell.get().as_ref() };
207
208 let core = unsafe { core_maybe.unwrap_unchecked() };
210
211 let pool = core.pool.lock();
212
213 for event_ptr in pool.iter() {
214 let event_cell = unsafe { event_ptr.as_ref() };
219
220 let event_maybe = unsafe { event_cell.get().as_ref() };
222
223 let event = unsafe { event_maybe.unwrap_unchecked() };
225
226 let event = unsafe { event.assume_init_ref() };
228
229 event.inspect_awaiter(|bt| {
230 if let Some(bt) = bt {
231 f(bt);
232 }
233 });
234 }
235 }
236}
237
238impl<T: Send + 'static> Default for RawEventPool<T> {
239 fn default() -> Self {
240 Self::new()
241 }
242}
243
244unsafe impl<T: Send + 'static> Send for RawEventPool<T> {}
248unsafe impl<T: Send + 'static> Sync for RawEventPool<T> {}
250
251#[cfg(test)]
252#[allow(clippy::undocumented_unsafe_blocks, reason = "test code, be concise")]
253#[cfg_attr(coverage_nightly, coverage(off))]
254mod tests {
255 use std::pin::pin;
256 use std::sync::{Arc, Barrier};
257 use std::task::{self, Poll, Waker};
258 use std::{iter, thread};
259
260 use spin_on::spin_on;
261 use static_assertions::assert_impl_all;
262
263 use super::*;
264 use crate::Disconnected;
265
266 assert_impl_all!(RawEventPool<u32>: Send, Sync);
267
268 #[test]
269 fn len() {
270 let pool = pin!(RawEventPool::<i32>::new());
271
272 assert_eq!(pool.len(), 0);
273
274 let (sender1, receiver1) = unsafe { pool.as_ref().rent() };
275 assert_eq!(pool.len(), 1);
276
277 let (sender2, receiver2) = unsafe { pool.as_ref().rent() };
278 assert_eq!(pool.len(), 2);
279
280 drop(sender1);
281 drop(receiver1);
282 assert_eq!(pool.len(), 1);
283
284 drop(sender2);
285 drop(receiver2);
286 assert_eq!(pool.len(), 0);
287 }
288
289 #[test]
290 fn send_receive() {
291 let pool = pin!(RawEventPool::<i32>::new());
292
293 assert!(pool.is_empty());
294
295 let (sender, receiver) = unsafe { pool.as_ref().rent() };
296
297 assert!(!pool.is_empty());
298
299 {
300 let mut receiver = pin!(receiver);
301
302 sender.send(42);
303
304 let mut cx = task::Context::from_waker(Waker::noop());
305
306 let poll_result = receiver.as_mut().poll(&mut cx);
307 assert!(matches!(poll_result, Poll::Ready(Ok(42))));
308 }
309
310 assert!(pool.is_empty());
311 }
312
313 #[test]
314 fn send_receive_reused() {
315 const ITERATIONS: usize = 32;
316
317 let pool = pin!(RawEventPool::<i32>::new());
318
319 assert!(pool.is_empty());
320
321 for _ in 0..ITERATIONS {
322 let (sender, receiver) = unsafe { pool.as_ref().rent() };
323 let mut receiver = pin!(receiver);
324
325 sender.send(42);
326
327 let mut cx = task::Context::from_waker(Waker::noop());
328
329 let poll_result = receiver.as_mut().poll(&mut cx);
330 assert!(matches!(poll_result, Poll::Ready(Ok(42))));
331 }
332
333 assert!(pool.is_empty());
334 }
335
336 #[test]
337 fn send_receive_reused_batches() {
338 const ITERATIONS: usize = 4;
339 const BATCH_SIZE: usize = 8;
340
341 let pool = pin!(RawEventPool::<i32>::new());
342
343 for _ in 0..ITERATIONS {
344 let endpoints = iter::repeat_with(|| unsafe { pool.as_ref().rent() })
345 .take(BATCH_SIZE)
346 .collect::<Vec<_>>();
347
348 for (sender, receiver) in endpoints {
349 let mut receiver = pin!(receiver);
350
351 sender.send(42);
352
353 let mut cx = task::Context::from_waker(Waker::noop());
354
355 let poll_result = receiver.as_mut().poll(&mut cx);
356 assert!(matches!(poll_result, Poll::Ready(Ok(42))));
357 }
358 }
359 }
360
361 #[test]
362 fn drop_send() {
363 let pool = pin!(RawEventPool::<i32>::new());
364
365 let (sender, _) = unsafe { pool.as_ref().rent() };
366
367 sender.send(42);
368 }
369
370 #[test]
371 fn drop_receive() {
372 let pool = pin!(RawEventPool::<i32>::new());
373
374 let (_, receiver) = unsafe { pool.as_ref().rent() };
375 let mut receiver = pin!(receiver);
376
377 let mut cx = task::Context::from_waker(Waker::noop());
378
379 let poll_result = receiver.as_mut().poll(&mut cx);
380 assert!(matches!(poll_result, Poll::Ready(Err(Disconnected))));
381 }
382
383 #[test]
384 fn receive_drop_receive() {
385 let pool = pin!(RawEventPool::<i32>::new());
386
387 let (sender, receiver) = unsafe { pool.as_ref().rent() };
388 let mut receiver = pin!(receiver);
389
390 let mut cx = task::Context::from_waker(Waker::noop());
391
392 let poll_result = receiver.as_mut().poll(&mut cx);
393 assert!(matches!(poll_result, Poll::Pending));
394
395 drop(sender);
396
397 let poll_result = receiver.as_mut().poll(&mut cx);
398 assert!(matches!(poll_result, Poll::Ready(Err(Disconnected))));
399 }
400
401 #[test]
402 fn receive_drop_send() {
403 let pool = pin!(RawEventPool::<i32>::new());
404
405 let (sender, receiver) = unsafe { pool.as_ref().rent() };
406 let mut receiver = Box::pin(receiver);
407
408 let mut cx = task::Context::from_waker(Waker::noop());
409
410 let poll_result = receiver.as_mut().poll(&mut cx);
411 assert!(matches!(poll_result, Poll::Pending));
412
413 drop(receiver);
414
415 sender.send(42);
416 }
417
418 #[test]
419 fn receive_drop_drop_receiver_first() {
420 let pool = pin!(RawEventPool::<i32>::new());
421
422 let (sender, receiver) = unsafe { pool.as_ref().rent() };
423 let mut receiver = Box::pin(receiver);
424
425 let mut cx = task::Context::from_waker(Waker::noop());
426
427 let poll_result = receiver.as_mut().poll(&mut cx);
428 assert!(matches!(poll_result, Poll::Pending));
429
430 drop(receiver);
431 drop(sender);
432 }
433
434 #[test]
435 fn receive_drop_drop_sender_first() {
436 let pool = pin!(RawEventPool::<i32>::new());
437
438 let (sender, receiver) = unsafe { pool.as_ref().rent() };
439 let mut receiver = Box::pin(receiver);
440
441 let mut cx = task::Context::from_waker(Waker::noop());
442
443 let poll_result = receiver.as_mut().poll(&mut cx);
444 assert!(matches!(poll_result, Poll::Pending));
445
446 drop(sender);
447 drop(receiver);
448 }
449
450 #[test]
451 fn drop_drop_receiver_first() {
452 let pool = pin!(RawEventPool::<i32>::new());
453
454 let (sender, receiver) = unsafe { pool.as_ref().rent() };
455
456 drop(receiver);
457 drop(sender);
458 }
459
460 #[test]
461 fn drop_drop_sender_first() {
462 let pool = pin!(RawEventPool::<i32>::new());
463
464 let (sender, receiver) = unsafe { pool.as_ref().rent() };
465
466 drop(sender);
467 drop(receiver);
468 }
469
470 #[test]
471 fn is_ready() {
472 let pool = pin!(RawEventPool::<i32>::new());
473
474 let (sender, receiver) = unsafe { pool.as_ref().rent() };
475 let mut receiver = pin!(receiver);
476
477 assert!(!receiver.is_ready());
478
479 sender.send(42);
480
481 assert!(receiver.is_ready());
482
483 let mut cx = task::Context::from_waker(Waker::noop());
484
485 let poll_result = receiver.as_mut().poll(&mut cx);
486 assert!(matches!(poll_result, Poll::Ready(Ok(42))));
487 }
488
489 #[test]
490 fn drop_is_ready() {
491 let pool = pin!(RawEventPool::<i32>::new());
492
493 let (sender, receiver) = unsafe { pool.as_ref().rent() };
494 let mut receiver = pin!(receiver);
495
496 assert!(!receiver.is_ready());
497
498 drop(sender);
499
500 assert!(receiver.is_ready());
501
502 let mut cx = task::Context::from_waker(Waker::noop());
503
504 let poll_result = receiver.as_mut().poll(&mut cx);
505 assert!(matches!(poll_result, Poll::Ready(Err(Disconnected))));
506 }
507
508 #[test]
509 fn into_value() {
510 let pool = pin!(RawEventPool::<i32>::new());
511
512 let (sender, receiver) = unsafe { pool.as_ref().rent() };
513
514 let Err(crate::IntoValueError::Pending(receiver)) = receiver.into_value() else {
515 panic!("Expected receiver to not be ready");
516 };
517
518 sender.send(42);
519
520 assert!(matches!(receiver.into_value(), Ok(42)));
521 }
522
523 #[test]
524 #[should_panic]
525 fn panic_poll_after_completion() {
526 let pool = pin!(RawEventPool::<i32>::new());
527
528 let (sender, receiver) = unsafe { pool.as_ref().rent() };
529 let mut receiver = pin!(receiver);
530
531 sender.send(42);
532
533 let mut cx = task::Context::from_waker(Waker::noop());
534
535 assert!(matches!(
536 receiver.as_mut().poll(&mut cx),
537 Poll::Ready(Ok(42))
538 ));
539
540 _ = receiver.as_mut().poll(&mut cx);
541 }
542
543 #[test]
544 #[should_panic]
545 fn panic_is_ready_after_completion() {
546 let pool = pin!(RawEventPool::<i32>::new());
547
548 let (sender, receiver) = unsafe { pool.as_ref().rent() };
549 let mut receiver = pin!(receiver);
550
551 sender.send(42);
552
553 let mut cx = task::Context::from_waker(Waker::noop());
554
555 assert!(matches!(
556 receiver.as_mut().poll(&mut cx),
557 Poll::Ready(Ok(42))
558 ));
559
560 _ = receiver.is_ready();
561 }
562
563 #[test]
564 fn send_receive_mt() {
565 let pool = pin!(RawEventPool::<i32>::new());
566
567 let (sender, receiver) = unsafe { pool.as_ref().rent() };
568
569 thread::spawn(move || {
570 sender.send(42);
571 })
572 .join()
573 .unwrap();
574
575 thread::spawn(move || {
576 let mut receiver = pin!(receiver);
577 let mut cx = task::Context::from_waker(Waker::noop());
578
579 let poll_result = receiver.as_mut().poll(&mut cx);
580 assert!(matches!(poll_result, Poll::Ready(Ok(42))));
581 })
582 .join()
583 .unwrap();
584 }
585
586 #[test]
587 fn receive_send_receive_mt() {
588 let pool = pin!(RawEventPool::<i32>::new());
589
590 let (sender, receiver) = unsafe { pool.as_ref().rent() };
591
592 let first_poll_completed = Arc::new(Barrier::new(2));
593 let first_poll_completed_clone = Arc::clone(&first_poll_completed);
594
595 let send_thread = thread::spawn(move || {
596 first_poll_completed.wait();
597
598 sender.send(42);
599 });
600
601 let receive_thread = thread::spawn(move || {
602 let mut receiver = pin!(receiver);
603 let mut cx = task::Context::from_waker(Waker::noop());
604
605 let poll_result = receiver.as_mut().poll(&mut cx);
606 assert!(matches!(poll_result, Poll::Pending));
607
608 first_poll_completed_clone.wait();
609
610 spin_on(async {
612 let result = &mut receiver.await;
613 assert!(matches!(result, Ok(42)));
614 });
615 });
616
617 send_thread.join().unwrap();
618 receive_thread.join().unwrap();
619 }
620
621 #[test]
622 fn send_receive_unbiased_mt() {
623 let pool = pin!(RawEventPool::<i32>::new());
624
625 let (sender, receiver) = unsafe { pool.as_ref().rent() };
626
627 let receive_thread = thread::spawn(move || {
628 spin_on(async {
629 let result = &mut receiver.await;
630 assert!(matches!(result, Ok(42)));
631 });
632 });
633
634 let send_thread = thread::spawn(move || {
635 sender.send(42);
636 });
637
638 send_thread.join().unwrap();
639 receive_thread.join().unwrap();
640 }
641
642 #[test]
643 fn drop_receive_unbiased_mt() {
644 let pool = pin!(RawEventPool::<i32>::new());
645
646 let (sender, receiver) = unsafe { pool.as_ref().rent() };
647
648 let receive_thread = thread::spawn(move || {
649 spin_on(async {
650 let result = &mut receiver.await;
651 assert!(matches!(result, Err(Disconnected)));
652 });
653 });
654
655 let send_thread = thread::spawn(move || {
656 drop(sender);
657 });
658
659 send_thread.join().unwrap();
660 receive_thread.join().unwrap();
661 }
662
663 #[test]
664 fn drop_send_unbiased_mt() {
665 let pool = pin!(RawEventPool::<i32>::new());
666
667 let (sender, receiver) = unsafe { pool.as_ref().rent() };
668
669 let receive_thread = thread::spawn(move || {
670 drop(receiver);
671 });
672
673 let send_thread = thread::spawn(move || {
674 sender.send(42);
675 });
676
677 send_thread.join().unwrap();
678 receive_thread.join().unwrap();
679 }
680
681 #[cfg(debug_assertions)]
682 #[test]
683 fn inspect_awaiters_inspects_only_awaited() {
684 let pool = pin!(RawEventPool::<i32>::new());
685
686 let (_sender1, receiver1) = unsafe { pool.as_ref().rent() };
687 let (sender2, receiver2) = unsafe { pool.as_ref().rent() };
688 let (_sender3, _receiver3) = unsafe { pool.as_ref().rent() };
689
690 let mut receiver1 = pin!(receiver1);
691 let mut receiver2 = Box::pin(receiver2);
692
693 let mut cx = task::Context::from_waker(Waker::noop());
694 _ = receiver1.as_mut().poll(&mut cx);
695 _ = receiver2.as_mut().poll(&mut cx);
696
697 let mut inspected_count = 0;
698
699 pool.inspect_awaiters(|_bt| {
700 inspected_count += 1;
701 });
702
703 assert_eq!(inspected_count, 2);
704
705 drop(sender2);
706 drop(receiver2);
707
708 let mut inspected_count = 0;
709
710 pool.inspect_awaiters(|_bt| {
711 inspected_count += 1;
712 });
713
714 assert_eq!(inspected_count, 1);
715 }
716
717 #[test]
718 fn default_creates_functional_pool() {
719 let pool = pin!(RawEventPool::<i32>::default());
720
721 assert!(pool.is_empty());
722
723 let (sender, receiver) = unsafe { pool.as_ref().rent() };
724 let mut receiver = pin!(receiver);
725
726 sender.send(42);
727
728 let mut cx = task::Context::from_waker(Waker::noop());
729
730 let poll_result = receiver.as_mut().poll(&mut cx);
731 assert!(matches!(poll_result, Poll::Ready(Ok(42))));
732 }
733}