s2n_quic_core/sync/
atomic_waker.rs1use crate::sync::primitive::{Arc, AtomicBool, Ordering};
5use core::{
6 pin::Pin,
7 task::{Context, Poll, Waker},
8};
9use crossbeam_utils::CachePadded;
10
11pub use crate::sync::primitive::AtomicWaker;
12
13pub fn pair() -> (Handle, Handle) {
15 let storage = Arc::pin(Storage::default());
16
17 let a_ptr = &storage.a as *const _;
18 let b_ptr = &storage.b as *const _;
19 let is_open = &*storage.is_open as *const _;
20
21 let a = Handle {
22 local: a_ptr,
23 remote: b_ptr,
24 is_open,
25 storage: storage.clone(),
26 };
27
28 let b = Handle {
29 local: b_ptr,
30 remote: a_ptr,
31 is_open,
32 storage: storage.clone(),
33 };
34
35 (a, b)
36}
37
38#[derive(Debug)]
40pub struct Handle {
41 local: *const AtomicWaker,
43 remote: *const AtomicWaker,
44 is_open: *const AtomicBool,
45 #[allow(dead_code)]
46 storage: Pin<Arc<Storage>>,
47}
48
49unsafe impl Send for Handle {}
51unsafe impl Sync for Handle {}
53
54impl Handle {
55 #[inline]
57 pub fn register(&self, waker: &Waker) {
58 unsafe { (*self.local).register(waker) }
59 }
60
61 #[inline]
63 pub fn wake(&self) {
64 unsafe { (*self.remote).wake() }
65 }
66
67 #[inline]
72 pub fn is_open(&self) -> bool {
73 unsafe { (*self.is_open).load(Ordering::Acquire) }
74 }
75
76 #[inline]
78 pub fn poll_close(&mut self, cx: &mut Context) -> Poll<()> {
79 if !self.is_open() {
80 return Poll::Ready(());
81 }
82
83 self.register(cx.waker());
84
85 if !self.is_open() {
86 Poll::Ready(())
87 } else {
88 Poll::Pending
89 }
90 }
91}
92
93#[derive(Debug)]
94struct Storage {
95 a: AtomicWaker,
96 b: AtomicWaker,
97 is_open: CachePadded<AtomicBool>,
98}
99
100impl Default for Storage {
101 fn default() -> Self {
102 Self {
103 a: Default::default(),
104 b: Default::default(),
105 is_open: CachePadded::new(AtomicBool::new(true)),
106 }
107 }
108}
109
110impl Drop for Handle {
111 #[inline]
112 fn drop(&mut self) {
113 unsafe {
115 (*self.is_open).store(false, Ordering::Release);
116 }
117 self.wake();
118 }
119}