1use std::mem::ManuallyDrop;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::task::Poll;
16use atomic_waker::AtomicWaker;
17use logwise::perfwarn_begin;
18
19#[derive(Debug)]
20pub struct AtomicLockAsync<T> {
21 lock: atomiclock::AtomicLock<T>,
22 wakelist: atomiclock_spinlock::Lock<Vec<Arc<AtomicWaker>>>,
23}
24
25
26#[derive(Debug)]
27pub struct Guard<'a, T> {
28 _guard: ManuallyDrop<atomiclock::Guard<'a, T>>,
29 lock: &'a AtomicLockAsync<T>,
30}
31
32
33#[derive(Debug)]
34#[must_use]
35pub struct LockFuture<'a, T> {
36 lock: &'a AtomicLockAsync<T>,
37 registered_waker: Option<Arc<AtomicWaker>>,
38}
39
40
41impl<T> AtomicLockAsync<T> {
42 pub const fn new(t: T) -> Self {
46 AtomicLockAsync {
47 lock: atomiclock::AtomicLock::new(t),
48 wakelist: atomiclock_spinlock::Lock::new(vec![])
49 }
50 }
51
52
53 pub fn lock_if_available(&self) -> Option<Guard<'_, T>> {
57 self.lock.lock()
58 .map(|guard| Guard { _guard: ManuallyDrop::new(guard), lock: self })
59 }
60
61 pub fn lock(&self) -> LockFuture<T> {
65 LockFuture{ lock: self, registered_waker: None }
66 }
67
68 pub fn lock_warn(&self) -> LockWarnFuture<T> {
74 LockWarnFuture{ underlying_future: self.lock(), perfwarn_interval: None }
75 }
76
77 pub fn into_inner(self) -> T {
81 self.lock.into_inner()
82 }
83}
84
85impl<T> Drop for Guard<'_, T> {
86 fn drop(&mut self) {
87 unsafe{ManuallyDrop::drop(&mut self._guard)}; {
90 let mut lock = self.lock.wakelist.spin_lock_warn();
91 for drain in lock.drain(..) {
92 drain.wake();
93 }
94 }
95
96 }
97}
98
99impl<T> Guard<'_, T> {
100 pub const fn lock(&self) -> &AtomicLockAsync<T> {
104 self.lock
105 }
106}
107
108impl<'a, T> std::future::Future for LockFuture<'a, T> {
109 type Output = Guard<'a, T>;
110
111 fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
112 match self.lock.lock.lock() {
113 Some(guard) => {
114 std::task::Poll::Ready(Guard{_guard: ManuallyDrop::new(guard), lock: self.lock})
115 },
116 None => {
117 match self.registered_waker {
118 Some(ref waker) => {
119 waker.register(cx.waker());
120 Poll::Pending
121 },
122 None => {
123 let waker = Arc::new(AtomicWaker::new());
124 waker.register(cx.waker());
125 self.lock.wakelist.spin_lock_warn().push(waker.clone());
126 self.registered_waker = Some(waker);
127
128 Poll::Pending
129 }
130 }
131 }
132 }
133 }
134}
135
136
137#[derive(Debug)]
138#[must_use]
139pub struct LockWarnFuture<'a, T> {
140 underlying_future: LockFuture<'a, T>,
141 perfwarn_interval: Option<logwise::interval::PerfwarnInterval>,
142}
143
144impl<'a, T> std::future::Future for LockWarnFuture<'a, T> {
145 type Output = Guard<'a, T>;
146
147 fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
148 let unchecked_mut = unsafe{self.get_unchecked_mut()};
149 if let None = unchecked_mut.perfwarn_interval {
150 unchecked_mut.perfwarn_interval = Some(perfwarn_begin!("AtomicLockAsync::lock"));
151 }
152 let underlying_future = unsafe{Pin::new_unchecked(&mut unchecked_mut.underlying_future)};
153 let r = underlying_future.poll(cx);
154 if let std::task::Poll::Ready(_) = r {
155 unchecked_mut.perfwarn_interval.take();
156 }
157 r
158 }
159}
160
161 impl <T: Default> Default for AtomicLockAsync<T> {
171 fn default() -> Self {
172 AtomicLockAsync::new(T::default())
173 }
174}
175
176impl <T> From<T> for AtomicLockAsync<T> {
180 fn from(t: T) -> Self {
181 AtomicLockAsync::new(t)
182 }
183}
184
185impl<'a, T> AsRef<T> for Guard<'a, T> {
200 fn as_ref(&self) -> &T {
201 self._guard.as_ref()
202 }
203}
204
205impl<'a, T> AsMut<T> for Guard<'a, T> {
206 fn as_mut(&mut self) -> &mut T {
207 self._guard.as_mut()
208 }
209}
210
211impl<'a, T> std::ops::Deref for Guard<'a, T> {
214 type Target = T;
215
216 fn deref(&self) -> &Self::Target {
217 self._guard.deref()
218 }
219}
220
221impl<'a, T> std::ops::DerefMut for Guard<'a, T> {
222 fn deref_mut(&mut self) -> &mut Self::Target {
223 self._guard.deref_mut()
224 }
225}
226
227
228
229