1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
use core::{
cell::{Cell, UnsafeCell},
sync::atomic::{
AtomicBool,
Ordering::{Acquire, Release},
},
task::{Context, Poll},
};
use crate::wake_list::{WakeHandle, WakeList};
/// Mutex
pub(crate) struct Mutex<T> {
/// True if mutex is currently being accessed
lock: AtomicBool,
/// Data in transit
data: UnsafeCell<Option<T>>,
/// List of waiting senders
send: WakeList,
/// List of waiting receivers
recv: WakeList,
}
unsafe impl<T: Send> Send for Mutex<T> {}
unsafe impl<T: Send> Sync for Mutex<T> {}
impl<T> Mutex<T> {
/// Create a new mutex
pub(crate) const fn new() -> Self {
let lock = AtomicBool::new(false);
let data = UnsafeCell::new(None);
let send = WakeList::new();
let recv = WakeList::new();
Self {
lock,
data,
send,
recv,
}
}
/// Try to store data in the mutex
pub(crate) fn store(
&self,
data: &Cell<Option<T>>,
cx: &mut Context<'_>,
wh: &mut WakeHandle,
) -> Poll<()> {
// Try to acquire lock
if self.lock.swap(true, Acquire) {
// Data is contended, register sender to wake list
wh.register(&self.send, cx.waker().clone());
// Try again just in case registration is unnecessary
if self.lock.swap(true, Acquire) {
// Will be awoken
return Poll::Pending;
} else {
// Locked, and registered
// If can't send until receive
if unsafe { (*self.data.get()).is_some() } {
// Release lock
self.lock.store(false, Release);
// Wake a receiver
self.recv.wake_one();
return Poll::Pending;
}
// Registration was unnecessary, unregister
*wh = WakeHandle::new();
}
} else {
// Locked, but not registered
// If can't send until receive, register and return
if unsafe { (*self.data.get()).is_some() } {
// Register
wh.register(&self.send, cx.waker().clone());
// Release lock
self.lock.store(false, Release);
// Wake a receiver
self.recv.wake_one();
return Poll::Pending;
}
}
// Write to inner data
unsafe { (*self.data.get()) = data.take() };
// Release lock
self.lock.store(false, Release);
// Wake a receiver
self.recv.wake_one();
Poll::Ready(())
}
/// Try to take data from the mutex
pub(crate) fn take(
&self,
cx: &mut Context<'_>,
wh: &mut WakeHandle,
) -> Poll<T> {
// Try to acquire lock
if self.lock.swap(true, Acquire) {
// Data is contended, register sender to wake list
wh.register(&self.recv, cx.waker().clone());
// Try again just in case registration is unnecessary
if self.lock.swap(true, Acquire) {
// Will be awoken
return Poll::Pending;
} else {
// Locked, and registered
// If can't receive until send
if unsafe { (*self.data.get()).is_none() } {
// Release lock
self.lock.store(false, Release);
// Wake a sender
self.send.wake_one();
return Poll::Pending;
}
// Registration was unnecessary, unregister
*wh = WakeHandle::new();
}
} else {
// If can't receive until send, register and return
if unsafe { (*self.data.get()).is_none() } {
// Register
wh.register(&self.recv, cx.waker().clone());
// Release lock
self.lock.store(false, Release);
// Wake a sender
self.send.wake_one();
return Poll::Pending;
}
}
// Take from inner data
let ret = if let Some(data) = unsafe { (*self.data.get()).take() } {
Poll::Ready(data)
} else {
Poll::Pending
};
// Release lock
self.lock.store(false, Release);
// Wake a sender
self.send.wake_one();
// Return `Ready(_)`
ret
}
}