1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
use std::marker::PhantomData;
use std::sync::Arc;
//use super::ResourceId;
use super::Dispatcher;
use super::RequiredResources;
// This holds the locks for resources that were acquired by the AcquireResources future
pub struct AcquiredResourcesLockGuards<T> {
_reads: Vec<tokio::sync::lock::LockGuard<()>>,
_writes: Vec<tokio::sync::lock::LockGuard<()>>,
phantom_data: PhantomData<T>,
}
impl<T> AcquiredResourcesLockGuards<T> {
fn new(
reads: Vec<tokio::sync::lock::LockGuard<()>>,
writes: Vec<tokio::sync::lock::LockGuard<()>>,
) -> Self {
AcquiredResourcesLockGuards::<T> {
_reads: reads,
_writes: writes,
phantom_data: PhantomData,
}
}
}
// Waits until the locks for all required resources can be gathered. The result is a struct that owns
// the guards for the resources
pub struct AcquireResources<T, ResourceId>
where
ResourceId: super::ResourceIdTrait,
{
id: usize,
dispatcher: Arc<Dispatcher<ResourceId>>,
state: AcquireResourcesState,
phantom_data: PhantomData<T>,
required_reads: Vec<ResourceId>,
required_writes: Vec<ResourceId>,
}
#[derive(Debug)]
enum AcquireResourcesState {
// We think we can acquire all required locks and are waiting for our turn to try
WaitForDispatch(tokio::sync::lock::Lock<()>),
// We were not able to acquire a lock we needed (this lock is pending on the resource we failed
// to get)
WaitForResource(tokio::sync::lock::Lock<()>),
// We acquired the resources
Finished,
}
impl<T, ResourceId> AcquireResources<T, ResourceId>
where
ResourceId: super::ResourceIdTrait,
{
pub fn new(
dispatcher: Arc<Dispatcher<ResourceId>>,
required_resources: RequiredResources<T, ResourceId>,
) -> Self {
AcquireResources::<T, ResourceId> {
id: dispatcher.take_task_id(),
state: AcquireResourcesState::WaitForDispatch(dispatcher.dispatch_lock().clone()),
dispatcher,
required_reads: required_resources.reads,
required_writes: required_resources.writes,
phantom_data: PhantomData,
}
}
}
enum TryTakeLocksResult<ResourceId>
where
ResourceId: super::ResourceIdTrait,
{
// All locks were successfully taken, contains the guards for those acquired locks
Success(Vec<tokio::sync::lock::LockGuard<()>>),
// A lock was not able to be captured, the lock here is the lock we need to await
Failure(ResourceId, tokio::sync::lock::Lock<()>),
}
impl<T, ResourceId> AcquireResources<T, ResourceId>
where
ResourceId: super::ResourceIdTrait,
{
// Tries to take all locks. If successful, returns a Vec of lock guards. Otherwise, returns the
// lock that failed (and needs to be awaited before trying to dispatch again)
fn try_take_locks(
&self,
required_resources: &Vec<ResourceId>,
) -> TryTakeLocksResult<ResourceId> {
let mut guards = vec![];
for resource in required_resources {
// We expect every resource type that we will try to fetch already has a lock set up
let mut lock = self
.dispatcher
.resource_locks()
.get(&resource)
.expect("A resource lock does not exist for a certain type.")
.clone();
match lock.poll_lock() {
futures::Async::Ready(guard) => guards.push(guard),
futures::Async::NotReady => {
return TryTakeLocksResult::Failure(resource.clone(), lock)
}
}
}
TryTakeLocksResult::Success(guards)
}
}
impl<T, ResourceId> futures::future::Future for AcquireResources<T, ResourceId>
where
ResourceId: super::ResourceIdTrait,
{
type Item = AcquiredResourcesLockGuards<T>;
type Error = ();
fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
trace!(
"<{}> Task woke up in state {}",
self.id,
match &self.state {
AcquireResourcesState::WaitForDispatch(_) => "WaitForDispatch",
AcquireResourcesState::WaitForResource(_) => "WaitForResource",
AcquireResourcesState::Finished => "Finished",
}
);
loop {
match &mut self.state {
// This state will wait for a lock on the main dispatch lock, and then try to
// take a lock on all resources it needs to progress. This is deadlock-safe since
// only one task is permitted to try to take locks at a time
AcquireResourcesState::WaitForDispatch(dispatch_lock) => {
let lock_result = {
// Wait until we get an exclusive lock to acquire resources. This is necessary since
// we're going to try to grabbing multiple locks at a time to avoid deadlocks.
trace!("<{}> Poll dispatch lock", self.id);
let _dispatch_guard = match dispatch_lock.poll_lock() {
futures::Async::Ready(guard) => guard,
futures::Async::NotReady => {
trace!("<{}> Not able to dispatch", self.id);
return Ok(futures::Async::NotReady);
}
};
// At this point we have exclusive permission to check if existing resources
// are available
trace!("<{}> Check resource locks", self.id);
// Try to get read access where needed
let read_guards = match self.try_take_locks(&self.required_reads) {
TryTakeLocksResult::Success(guards) => guards,
TryTakeLocksResult::Failure(resource_id, lock) => {
trace!(
"<{}> Failed to acquire read access for {:?}",
self.id,
resource_id
);
self.state = AcquireResourcesState::WaitForResource(lock);
return Ok(futures::Async::NotReady);
}
};
// Try to get write access where needed
let write_guards = match self.try_take_locks(&self.required_writes) {
TryTakeLocksResult::Success(guards) => guards,
TryTakeLocksResult::Failure(resource_id, lock) => {
trace!(
"<{}> Failed to acquire write access for {:?}",
self.id,
resource_id
);
self.state = AcquireResourcesState::WaitForResource(lock);
return Ok(futures::Async::NotReady);
}
};
trace!("<{}> Resource locks acquired", self.id);
// As long as this result is held, it will be safe to fetch the data from shred
AcquiredResourcesLockGuards::<T>::new(read_guards, write_guards)
};
self.state = AcquireResourcesState::Finished;
return Ok(futures::Async::Ready(lock_result));
}
AcquireResourcesState::WaitForResource(resource_lock) => {
// If we don't poll the lock after waiting for it, we will get stuck
match resource_lock.poll_lock() {
futures::Async::Ready(_) => {}
futures::Async::NotReady => {
trace!(
"<{}> Woke while waiting for resource but it's still not ready",
self.id
);
return Ok(futures::Async::NotReady);
}
}
trace!(
"<{}> Woke while waiting for resource, now trying to dispatch",
self.id
);
self.state = AcquireResourcesState::WaitForDispatch(
self.dispatcher.dispatch_lock().clone(),
);
}
// This state is here to catch if we try to poll in a completed state
AcquireResourcesState::Finished => unreachable!(),
}
}
}
}